diff options
author | Manel Caro <mcaro@iseebcn.com> | 2020-07-31 02:07:37 +0200 |
---|---|---|
committer | Manel Caro <mcaro@iseebcn.com> | 2020-07-31 02:07:37 +0200 |
commit | d46bce422fd03cd57d1ba336361da17d6efb48db (patch) | |
tree | e5ec7aa1ee5d53a655ce121a7c2ddd95888fc989 /test-cli | |
parent | 907b96801230e04d02575a3732a73e452089637b (diff) | |
download | board-d46bce422fd03cd57d1ba336361da17d6efb48db.zip board-d46bce422fd03cd57d1ba336361da17d6efb48db.tar.gz board-d46bce422fd03cd57d1ba336361da17d6efb48db.tar.bz2 |
TEST restructure
Diffstat (limited to 'test-cli')
33 files changed, 1512 insertions, 604 deletions
diff --git a/test-cli/__init__.py b/test-cli/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/test-cli/__init__.py diff --git a/test-cli/setup.xml b/test-cli/setup.xml index 5220887..5cdd65b 100644 --- a/test-cli/setup.xml +++ b/test-cli/setup.xml @@ -1,7 +1,21 @@ <?xml version="1.0"?> <data> <setup> + <test_main qr_wait_time="30" autotest="0" OverrideAutoReboot="1" delayReboot="1" wait_test_start="240"/> <db dbname="testsrv" type="PgSQLConnection" host="192.168.60.3" port="5432" user="admin" password="Idkfa2009" /> + <qaudio play_dtmf_file="/root/hwtest-files/dtmf-13579.wav" to="/mnt/station_ramdisk" record_dtmf_file="dtmf_recorded_{}.wav" recordtime="2.00" rate="8000" aplay_run_delay="0.1"/> + <qusb loops="3" pathfrom="/root/hwtest-files" file="usb-test.bin" file_md5="usb-test.bin.md5" pathto="/mnt/test/disk2"/> + <qusb2 loops="3" pathfrom="/mnt/test/disk1" file="usb-test.bin" file_md5="usb-test.bin.md5" pathto="/nfs/station/tmp"/> + <qdmesg syslog_dmesg="/var/log/kern.log"/> + <qram loops="1" memsize="100M" memtesterPath="/root/hwtest-files/memtester" to="/mnt/station_ramdisk" ram_res_file="ram_res.txt"/> + <qnand nandtestPath="/root/hwtest-files/nandtest"/> + <NAND_Flasher name="NAND_Flasher" igep_flash="/usr/bin/igep-flash" skipnandtest="1"/> + <EEProm_Flasher name="EEPROM_Flasher"/> + <qwifi loops="1" interface="wlan0" bwexpected="5.0" port="5000" serverip="192.168.5.4" duration="10" to="/mnt/station_ramdisk" wifi_res_file="eth_test_{}.json" wifi_st_file="eth_st_{}.json" retries="5" wait_retry="10"/> + <qeth loops="1" interface="eth0" bwexpected="5.0" port="5000" serverip="192.168.60.3" duration="10" to="/mnt/station_ramdisk" eth_res_file="eth_test_{}.json" eth_st_file="eth_st_{}.json" retries="5" wait_retry="10"/> + <qeeprom i2c_address="0050" i2c_bus="0"/> + <qemmc device="mmcblk0" to="/mnt/station_ramdisk" emmc_res_file="emmc_status.txt"/> + <qplc firmware="" firmware_md5="" factory_tool="/root/hwtest-files/apps/configlayer" gen_ip="0" gen_mac="1" mtd_device="/dev/mtd0" factory_password="betera" firmware_password="paterna"/> </setup> <usb> <usbdev MANUFACTURER="Linux 4.9.81+ ohci_hcd" PRODUCT="OHCI Host Controller" CLASS="09" VENDOR="1d6b" PRODID="0001" ignore="1"/> diff --git a/test-cli/test/enums/StationStates.py b/test-cli/test/enums/StationStates.py index 040e6bc..88dba1f 100644 --- a/test-cli/test/enums/StationStates.py +++ b/test-cli/test/enums/StationStates.py @@ -18,4 +18,5 @@ class StationStates(Enum): WAITING_FOR_SCANNER = 50 FINISHED = 60 STATION_REBOOT = 2001 + STATION_WAIT_SHUTDOWN = 2002 diff --git a/test-cli/test/helpers/camara.py b/test-cli/test/helpers/camara.py new file mode 100644 index 0000000..9b2829c --- /dev/null +++ b/test-cli/test/helpers/camara.py @@ -0,0 +1,124 @@ +import cv2 +from test.helpers.syscmd import SysCommand + +class Camara(object): + __parent = None + __device_name = None + __device = None + __w = 1280 + __h = 720 + __contrast = 0.0 + __brightness = 0.0 + __saturation = 55.0 + __hue = 0.0 + __exposure = 166 + + def __init__(self, parent, device="video0", width=1280, height=720): + self.__parent = parent + self.__device_name = device + self.__w = width + self.__h = height + + def Close(self): + if self.__device is not None: + del self.__device + self.__device = None + + def Open(self): + self.Close() + self.__device = cv2.VideoCapture("/dev/{}".format(self.__device_name)) + if self.__device.isOpened(): + self.__configure() + return True + return False + + def getSize(self): + return self.__w, self.__h + + def setSize(self, w, h): + if self.__device is not None and self.__device.isOpened(): + self.__w = self.__setCamVar(cv2.CAP_PROP_FRAME_WIDTH, w) + self.__h = self.__setCamVar(cv2.CAP_PROP_FRAME_HEIGHT, h) + else: + self.__w = w + self.__h = h + + def setContrast(self, newVal): + if self.__device.isOpened(): + self.__contrast = self.__setCamVar(cv2.CAP_PROP_CONTRAST, newVal) + else: + self.__contrast = newVal + + def getContrast(self): + return self.__contrast + + def setBrightness(self, newVal): + if self.__device.isOpened(): + self.__brightness = self.__setCamVar(cv2.CAP_PROP_BRIGHTNESS, newVal) + else: + self.__brightness = newVal + + def getBrightness(self): + return self.__brightness + + def __configure(self): + self.__w = self.__setCamVar(cv2.CAP_PROP_FRAME_WIDTH, self.__w) + self.__h = self.__setCamVar(cv2.CAP_PROP_FRAME_HEIGHT, self.__h) + cam_setup = SysCommand('v4lsetup', '{}/scripts/v4l-cam.sh'.format(self.__parent.getAppPath())) + cam_setup.execute() + + #self.__contrast = self.__setCamVar(cv2.CAP_PROP_CONTRAST, self.__contrast) + #self.__brightness = self.__setCamVar(cv2.CAP_PROP_BRIGHTNESS, self.__brightness) + #self.__saturation = self.__setCamVar(cv2.CAP_PROP_SATURATION, self.__saturation) + #self.__hue = self.__setCamVar(cv2.CAP_PROP_HUE, self.__hue) + #self.__exposure = self.__setCamVar(cv2.CAP_PROP_EXPOSURE, self.__exposure) + + + def __setCamVar(self, key, val): + valold = cv2.VideoCapture.get(self.__device, key) + if valold != val: + cv2.VideoCapture.set(self.__device, key, val) + t = cv2.VideoCapture.get(self.__device, key) + return t + return val + + def __getCamVar(self, key): + return cv2.VideoCapture.get(self.__device, key); + + def getFrameCount(self): + return cv2.VideoCapture.get(self.__device, cv2.CAP_PROP_BUFFERSIZE); + + def getFrame(self): + if self.__device.isOpened(): + retval, image = self.__device.read() + if retval: + return image + return None + else: + return None + + def getImageSize(self, image): + if hasattr(image, 'shape'): + return image.shape[1], image.shape[0] + else: + return (0, 0, 0) + + def showFrame(self, name, frame, w=False, maxTime=3000): + cv2.imshow(name, frame) + if w: + if maxTime == -1: + cv2.waitKey() + else: + cv2.waitKey(maxTime) + + def saveFrame(self, fileName, Frame): + cv2.imwrite(fileName, Frame) + + def readImage(self, filename): + return cv2.imread('{}'.format(filename), 0) + + def destroyWindow(self, name): + cv2.destroyWindow(name) + + def closeWindows(self): + cv2.destroyAllWindows()
\ No newline at end of file diff --git a/test-cli/test/helpers/cmdline.py b/test-cli/test/helpers/cmdline.py index fad81ac..db94a1c 100644 --- a/test-cli/test/helpers/cmdline.py +++ b/test-cli/test/helpers/cmdline.py @@ -1,5 +1,5 @@ -class LinuxKernel: +class LinuxKernelCmd: __kernel_vars = {} diff --git a/test-cli/test/helpers/gpio.py b/test-cli/test/helpers/gpio.py new file mode 100644 index 0000000..f127f2b --- /dev/null +++ b/test-cli/test/helpers/gpio.py @@ -0,0 +1,55 @@ +import os + +class gpio (object): + gpioNum = None + + def __init__(self, gpioNum, dir, val): + self.gpioNum = gpioNum + self.__export(gpioNum) + self.__set_dir(gpioNum, dir) + if dir == 'out': + self.__set_value(gpioNum, val) + + def set_val(self, value): + self.__set_value( self.gpioNum, value) + + + def __export(self, gpio_number): + if not os.path.isfile("/sys/class/gpio/gpio{}/value".format(gpio_number)): + try: + f = open("/sys/class/gpio/export", "w", newline="\n") + f.write(str(gpio_number)) + f.close() + except IOError: + return False, '{}'.format(IOError.errno) + return True + + def __unexport(self, gpio_number): + if os.path.isfile("/sys/class/gpio/gpio{}/value".format(gpio_number)): + try: + f = open("/sys/class/gpio/unexport", "w", newline="\n") + f.write(str(gpio_number)) + f.close() + except IOError: + return False, '{}'.format(IOError.errno) + return True + + def __set_dir(self, gpio_number, dir): + try: + f = open("/sys/class/gpio/gpio{}/direction".format(gpio_number), "r+", newline="\n") + val = f.readline() + if val != dir: + f.write(dir) + f.close() + except IOError: + return False, '{}'.format(IOError.errno) + return True + + def __set_value(self, gpio_number, value): + try: + f = open("/sys/class/gpio/gpio{}/value".format(gpio_number), "w", newline="\n") + f.write(str(value)) + f.close() + except IOError: + return False, '{}'.format(IOError.errno) + return True
\ No newline at end of file diff --git a/test-cli/test/helpers/int_registers.py b/test-cli/test/helpers/int_registers.py index d081853..133387c 100644 --- a/test-cli/test/helpers/int_registers.py +++ b/test-cli/test/helpers/int_registers.py @@ -22,6 +22,12 @@ def read(addr): os.close(fd) return "%08X" % retval[0] +def imx8m_readid(): + f = open("/sys/bus/soc/devices/soc0/soc_uid", "r", newline="\n") + val = f.readline() + f.close() + return val.rstrip() + def get_die_id(modelid): dieid = "" @@ -36,9 +42,10 @@ def get_die_id(modelid): registers = [0x4830A224, 0x4830A220, 0x4830A21C, 0x4830A218] elif modelid.find("OMAP5") == 0: registers = [0x4A002210, 0x4A00220C, 0x4A002208, 0x4A002200] + elif modelid.find("IGEP0048") == 0: + return imx8m_readid() else: - raise Exception('modelid not defined') - + raise Exception('modelid not defined: {}, modelid') for rg in registers: dieid = dieid + read(rg) return dieid diff --git a/test-cli/test/helpers/iseelogger.py b/test-cli/test/helpers/iseelogger.py index 785a78c..4b1087c 100644 --- a/test-cli/test/helpers/iseelogger.py +++ b/test-cli/test/helpers/iseelogger.py @@ -1,6 +1,6 @@ import logging import logging.handlers - +import datetime class ISEE_Logger(object): __logger = None @@ -37,3 +37,26 @@ class ISEE_Logger(object): def getlogger(self): return self.__logger +class MeasureTime: + __difference = None + __first_time = None + __later_time = None + + def __init__(self): + self.__first_time = datetime.datetime.now() + + def start(self): + self.first_time = datetime.datetime.now() + + def stop(self): + self.__later_time = datetime.datetime.now() + self.__difference = self.__later_time - self.__first_time + return self.__difference.total_seconds() + + def getTime(self): + return self.__difference.total_seconds() + + +global logObj +logObj = ISEE_Logger(logging.INFO) + diff --git a/test-cli/test/helpers/iw.py b/test-cli/test/helpers/iw.py new file mode 100644 index 0000000..8e29448 --- /dev/null +++ b/test-cli/test/helpers/iw.py @@ -0,0 +1,124 @@ +from sh import iw +from scanf import scanf + +class iw_scan: + __interface = None + __raw_unparsed = [] + __raw = None + + def __init__(self, interface='wlan0'): + self.__interface = interface + + def scan(self): + self.__raw_unparsed = [] + self.__raw = iw('{}'.format(self.__interface), 'scan') + if self.__raw.exit_code == 0: + self.__raw_unparsed = self.__raw.stdout.decode("utf-8").split('\n') + return True + return False + + def getRawData (self): + return self.__raw_unparsed + + def getInterface(self): + return self.__interface + + def getRawObject(self): + return self.__raw + + +class iwScan: + __iw_scan = None + __station_list = {} + + def __init__(self, interface='wlan0'): + self.__iw_scan = iw_scan(interface) + + def scan(self): + if self.__iw_scan.scan(): + self.__parse_scan(self.__iw_scan.getRawData()) + return True + return False + + def getLastError(self): + rObject = self.__iw_scan.getRawObject() + if rObject.exit_code != 0: + return rObject.stderr + return '' + + def __Parse_BBS(self, line): + data = {} + res = scanf("BSS %s(on %s) -- %s", line) + if res == None: + res = scanf("BSS %s(on %s)", line) + data["MAC"] = res[0] + data["DEV"] = res[1] + if len(res) == 3: + data["ASSOCIATED"] = 'YES' + else: + data["ASSOCIATED"] = 'NO' + return data + + def __Parse_T(self, id, OnNotFoundAttr ,line): + data = {} + res = scanf("{}: %s".format(id), line) + if res == None: + data[id.upper()] = OnNotFoundAttr + else: + data[id.upper()] = res[0] + return data + + def __Parse_X(self, line): + data = {} + res = scanf("DS Parameter set: channel %s", line) + if res == None: + data['CHANNEL'] = '-1' + else: + data['CHANNEL'] = res[0] + return data + + def __block_stationlist(self, rawdata): + list = {} + count = 0 + for line in rawdata: + idx = line.find('BSS') + if idx != -1 and idx == 0: + if len(list) > 0: + count += 1 + self.__station_list[count] = list + list = self.__Parse_BBS(line) + elif line.find('SSID:') != -1: + list = {**list, **self.__Parse_T('SSID', 'HIDDEN', line)} + elif line.find('freq:') != -1: + list = {**list, **self.__Parse_T('freq', '', line)} + elif line.find('signal:') != -1: + list = {**list, **self.__Parse_T('signal', '', line)} + elif line.find('DS Parameter set:') != -1: + list = {**list, **self.__Parse_X(line)} + if count > 0: + count += 1 + self.__station_list[count] = list + + def __parse_scan(self, rawData): + self.__station_list = {} + self.__block_stationlist(rawData) + #print('{}'.format(self.__station_list)) + #for i in self.__station_list: + # print(self.__station_list[i]) + + def findConnected (self): + for i in self.__station_list: + st = self.__station_list[i] + if st.get('ASSOCIATED', 'NO') == 'YES': + return True, self.__station_list[i] + return False, None + + def findBySSID(self, ssid): + for i in self.__station_list: + st = self.__station_list[i] + if st.get('SSID', '') == ssid: + return True, self.__station_list[i] + return False, None + + def getStations(self): + return self.__station_list
\ No newline at end of file diff --git a/test-cli/test/helpers/md5.py b/test-cli/test/helpers/md5.py new file mode 100644 index 0000000..9bc9e23 --- /dev/null +++ b/test-cli/test/helpers/md5.py @@ -0,0 +1,8 @@ +import hashlib + +def md5_file(fname): + hash_md5 = hashlib.md5() + with open(fname, "rb") as f: + for chunk in iter(lambda: f.read(4096), b""): + hash_md5.update(chunk) + return hash_md5.hexdigest()
\ No newline at end of file diff --git a/test-cli/test/helpers/plc.py b/test-cli/test/helpers/plc.py new file mode 100644 index 0000000..3b00934 --- /dev/null +++ b/test-cli/test/helpers/plc.py @@ -0,0 +1,57 @@ +import sh +from sh import flashcp +from sh import flash_erase +from sh import ErrorReturnCode +from sh import Command +from test.helpers.gpio import gpio + + +class dcpPLC(object): + __nReset = None + __Phy = None + __Plc = None + __mtd_device = None + __factool = None + __myConfigTool = None + + def __init__(self, plcfactool, mtd_device): + # default save firmware + self.__nRest = gpio('75', 'out', '0') + self.__nRest = gpio('69', 'out', '0') + self.__nRest = gpio('78', 'out', '1') + self.__factool = plcfactool + self.__mtd_device = mtd_device + self.__myConfigTool = Command(self.__factool) + + def setSaveFirmwareMode(self): + self.__nRest = gpio('75', 'out', '0') + self.__nRest = gpio('69', 'out', '0') + self.__nRest = gpio('78', 'out', '1') + + def setBootMode(self): + self.__nRest = gpio('78', 'out', '0') + self.__nRest = gpio('75', 'out', '1') + self.__nRest = gpio('69', 'out', '1') + + def SaveFirmware(self, firmare): + self.setSaveFirmwareMode() + try: + flash_erase(self.__mtd_device) + flashcp(firmare, self.__mtd_device) + except ErrorReturnCode as Error: + return False, "plc flash firmware failed {} ".format(Error.exit_code) + return True, '' + + def set_plc (self, var, value, password): + try: + self.__myConfigTool("-o", "SET", "-p", "{}".format(var), '{}'.format(value), "-w", "{}".format(password)) + except ErrorReturnCode as Error: + return False, "set var failed {} {}".format(var,Error.exit_code) + return True, '' + + def get_plc (self, var, value, password): + try: + self.__myConfigTool("-o", "GET", "-p", "{}".format(var), '{}'.format(value), "-w", "{}".format(password)) + except ErrorReturnCode as Error: + return False, "set var failed {} {}".format(var,Error.exit_code) + return True, '' diff --git a/test-cli/test/helpers/psqldb.py b/test-cli/test/helpers/psqldb.py index c98338a..e703c3a 100644 --- a/test-cli/test/helpers/psqldb.py +++ b/test-cli/test/helpers/psqldb.py @@ -31,6 +31,9 @@ class PgSQLConnection(object): print(error) return result + def getConfig(self): + return self.__db_config + def db_execute_query(self, query): cur = self.__conection_object.cursor() cur.execute(query) diff --git a/test-cli/test/helpers/qrreader.py b/test-cli/test/helpers/qrreader.py index 744db54..f663e99 100644 --- a/test-cli/test/helpers/qrreader.py +++ b/test-cli/test/helpers/qrreader.py @@ -4,6 +4,7 @@ import threading import time import selectors from selectors import DefaultSelector, EVENT_READ +from test.helpers.iseelogger import logObj selector = selectors.DefaultSelector() @@ -37,7 +38,9 @@ class QRReader: def getQRlist(self): devices = [evdev.InputDevice(path) for path in evdev.list_devices()] + logObj.getlogger().debug("{}".format(devices)) for device in devices: + logObj.getlogger().debug("{}".format(device.name)) if device.name in qrdevice_list: self.__myReader['NAME'] = device.name # print(self.__myReader['NAME']) diff --git a/test-cli/test/helpers/sdl.py b/test-cli/test/helpers/sdl.py new file mode 100644 index 0000000..8f55d9f --- /dev/null +++ b/test-cli/test/helpers/sdl.py @@ -0,0 +1,126 @@ +import sdl2.ext +from sdl2 import * +import os +import time + +Colors = { + 'red': sdl2.ext.Color(255, 0, 0, 255), + 'green': sdl2.ext.Color(0, 255, 0, 255), + 'blue': sdl2.ext.Color(0, 0, 255, 255), + 'white': sdl2.ext.Color(255, 255, 255, 255), + 'black': sdl2.ext.Color(0, 0, 0, 255), +} + + +class SDL2(object): + __resources = None + __w = 1024 + __h = 768 + __winName = "noname" + __window = None + __factory = None + __renderer = None + __srender = None + + def __init__(self, parent): + os.environ['SDL_VIDEODRIVER'] = "x11" + os.environ['DISPLAY'] = ":0" + self.__resources = sdl2.ext.Resources(parent.getAppPath(), 'files') + sdl2.ext.init() + self.__createWindow() + + def __createWindow(self): + self.__window = sdl2.ext.Window(title='{}'.format(self.__winName), size=(self.__w, self.__h), position=(0, 0), + flags=( + SDL_WINDOW_HIDDEN | SDL_WINDOW_FULLSCREEN | SDL_WINDOW_BORDERLESS | SDL_WINDOW_MAXIMIZED)) + self.__renderer = sdl2.ext.Renderer(self.__window) + self.__factory = sdl2.ext.SpriteFactory(sdl2.ext.TEXTURE, renderer=self.__renderer) + self.__srender = self.__factory.create_sprite_render_system(self.__window) + self.__window.show() + SDL_ShowCursor(SDL_DISABLE) + + def createFont(self, fontName, fontSize, fontColor): + return sdl2.ext.FontManager(font_path=self.__resources.get_path(fontName), size=fontSize, color=fontColor) + + def WriteText(self, text, x, y, fontManager): + imText = self.__factory.from_text(text, fontmanager=fontManager) + self.__renderer.copy(imText, dstrect=(x, y, imText.size[0], imText.size[1])) + + def show(self): + self.__window.show() + + def hide(self): + self.__window.hide() + + def setColor(self, red, green, blue, alpha): + self.__renderer.color = sdl2.ext.Color(red, green, blue, alpha) + + def fillColor(self, red, green, blue, alpha): + self.setColor(red, green, blue, alpha) + self.__renderer.clear() + + def fillbgColor(self, color, update=False): + if color in Colors: + self.__renderer.color = Colors[color] + self.__renderer.clear() + if update: + self.update() + + def update(self): + self.__renderer.present() + + def showImage(self, filename): + im = self.__factory.from_image(self.__resources.get_path(filename)) + self.__srender.render(im) + + +class SDL2_Test(object): + __sdl2 = None + + def __init__(self, parent): + self.__sdl2 = SDL2(parent) + + def Clear(self): + self.__sdl2.fillbgColor('black', True) + + def Paint(self, dcolor): + self.Clear() + self.__sdl2.fillbgColor(dcolor.lower(), True) + + +class SDL2_Message(object): + __sdl2 = None + __fontManager_w = None + __fontManager_b = None + + def __init__(self, parent): + self.__sdl2 = SDL2(parent) + self.__fontManager_w = self.__sdl2.createFont('OpenSans-Regular.ttf', 24, Colors['white']) + self.__fontManager_b = self.__sdl2.createFont('OpenSans-Regular.ttf', 24, Colors['black']) + + def Clear(self): + self.__sdl2.fillbgColor('black', True) + + def Paint(self, dcolor): + self.Clear() + self.__sdl2.fillbgColor(dcolor.lower(), True) + + def __write_w(self, x, y, text): + self.__sdl2.WriteText(text, x, y, self.__fontManager_w) + + def __write_b(self, x, y, text): + self.__sdl2.WriteText(text, x, y, self.__fontManager_b) + + def setTestOK(self, board_uuid, qrdata): + self.Paint('GREEN') + self.__write_b(100, 50, 'TEST OK') + self.__write_b(100, 100, 'uuid={}'.format(board_uuid)) + self.__write_b(100, 150, 'qr={}'.format(qrdata)) + self.__sdl2.update() + + def setTestERROR(self, error): + self.Paint('RED') + self.__write_w(100, 50, 'TEST FAILED') + self.__write_w(100, 100, '{}'.format(error)) + self.__sdl2.update() + diff --git a/test-cli/test/helpers/setup_xml.py b/test-cli/test/helpers/setup_xml.py index 603a2ac..d61d1fb 100644 --- a/test-cli/test/helpers/setup_xml.py +++ b/test-cli/test/helpers/setup_xml.py @@ -1,6 +1,5 @@ import xml.etree.ElementTree as XMLParser - class XMLSetup (object): """XML Setup Parser""" __tree = None # Parser diff --git a/test-cli/test/helpers/testsrv_db.py b/test-cli/test/helpers/testsrv_db.py index 9579e7e..c2b2276 100644 --- a/test-cli/test/helpers/testsrv_db.py +++ b/test-cli/test/helpers/testsrv_db.py @@ -24,6 +24,9 @@ class TestSrv_Database(object): self.__sqlObject = PgSQLConnection() return self.__sqlObject.db_connect(self.__xml_setup.getdbConnectionStr()) + def getConfig(self): + return self.__sqlObject.getConfig() + def create_board(self, processor_id, model_id, variant, station, bmac=None): '''create a new board''' if bmac is None: @@ -41,7 +44,8 @@ class TestSrv_Database(object): return res[0][0] except Exception as err: r = find_between(str(err), '#', '#') - # print(r) + print(r) + print(str(err)) return None def get_tests_list(self, board_uuid): @@ -95,10 +99,11 @@ class TestSrv_Database(object): sql = "SELECT isee.f_finish_test({},{},'{}','{}')".format(testid_ctl, testid, newstatus, textresult) # print('>>>' + sql) try: + print('finish_test => SQL {}'.format(sql)) self.__sqlObject.db_execute_query(sql) except Exception as err: r = find_between(str(err), '#', '#') - # print(r) + print('finish_test => {}'.format(err)) return None def upload_result_file(self, testid_ctl, testid, desc, filepath, mimetype): @@ -195,6 +200,18 @@ class TestSrv_Database(object): def change_station_state(self, station, newstate): sql = "SELECT station.setmystate('{}', '{}', NULL)".format(newstate, station) + print('>>>' + sql) + try: + res = self.__sqlObject.db_execute_query(sql) + print(res) + return res[0][0] + except Exception as err: + r = find_between(str(err), '#', '#') + print(r) + return None + + def get_setup_variable(self, skey): + sql = "SELECT * FROM admin.get_setupvar('{}')".format(skey) # print('>>>' + sql) try: res = self.__sqlObject.db_execute_query(sql) @@ -205,7 +222,7 @@ class TestSrv_Database(object): # print(r) return None - def get_setup_variable(self, skey): + def get_setup_variable(self, skey , default): sql = "SELECT * FROM admin.get_setupvar('{}')".format(skey) # print('>>>' + sql) try: @@ -215,4 +232,4 @@ class TestSrv_Database(object): except Exception as err: r = find_between(str(err), '#', '#') # print(r) - return None + return default diff --git a/test-cli/test/helpers/utils.py b/test-cli/test/helpers/utils.py new file mode 100644 index 0000000..8d2c27b --- /dev/null +++ b/test-cli/test/helpers/utils.py @@ -0,0 +1,68 @@ +import json +import socket +import os +import scanf + +def save_json_to_disk(filePath, description, mime, json_data, result): + with open(filePath, 'w') as outfile: + json.dump(json_data, outfile, indent=4) + outfile.close() + result.append( + { + "description": description, + "filepath": filePath, + "mimetype": mime + } + ) + +def save_file_to_disk(filePath, description, mime, data, result): + with open(filePath, 'w') as outfile: + outfile.write(data) + outfile.close() + result.append( + { + "description": description, + "filepath": filePath, + "mimetype": mime + } + ) + +def save_result (filePath, description, mime, result): + result.append( + { + "description": description, + "filepath": filePath, + "mimetype": mime + } + ) + + +def str2bool(v): + return v.lower() in ("yes", "true", "t", "1") + +def station2Port(base): + Name = socket.gethostname() + res = scanf.scanf("Station%d", Name) + if res[0]: + NmBase = int(base) + res = int(res[0]) + return str(NmBase + res) + return base + + +def sys_read(sysnode): + try: + f = open(sysnode, "r") + data = f.readline() + f.close() + except IOError as Error: + return False, '{}'.format(Error.errno) + return True, data + +def removefileIfExist(fname): + if os.path.exists(fname): + try: + os.remove(fname) + except OSError as error: + return False, str(error) + return True, '' diff --git a/test-cli/test/runners/simple.py b/test-cli/test/runners/simple.py index 2eb61a4..ece5a4e 100644 --- a/test-cli/test/runners/simple.py +++ b/test-cli/test/runners/simple.py @@ -5,6 +5,7 @@ Simple Test Runner for unittest module import sys import unittest +from test.helpers.iseelogger import logObj class SimpleTestRunner: @@ -59,28 +60,34 @@ class TextTestResult(unittest.TestResult): def addError(self, test, err): unittest.TestResult.addError(self, test, err) - # test.longMessage = err[1] + test.longMessage = err[1] self.result = self.ERROR - print(err[1]) + print(err) def addFailure(self, test, err): unittest.TestResult.addFailure(self, test, err) test.longMessage = err[1] self.result = self.FAIL + logObj.getlogger().info('{}:{}'.format(test, err)) def stopTest(self, test): - unittest.TestResult.stopTest(self, test) - # display: print test result - self.runner.writeUpdate(self.result) - # save result data - for result in test.getresults(): - self.__pgObj.upload_result_file(test.params["testidctl"], test.params["testid"], - result["description"], result["filepath"], result["mimetype"]) - # SEND TO DB THE RESULT OF THE TEST - if self.result == self.PASS: - status = "TEST_COMPLETE" - resulttext = test.gettextresult() - else: - status = "TEST_FAILED" - resulttext = test.longMessage - self.__pgObj.finish_test(test.params["testidctl"], test.params["testid"], status, resulttext) + try: + unittest.TestResult.stopTest(self, test) + # display: print test result + self.runner.writeUpdate(self.result) + # save result data + for result in test.getresults(): + self.__pgObj.upload_result_file(test.params["testidctl"], test.params["testid"], + result["description"], result["filepath"], result["mimetype"]) + # SEND TO DB THE RESULT OF THE TEST + if self.result == self.PASS: + status = "TEST_COMPLETE" + resulttext = test.gettextresult() + logObj.getlogger().info('{}:{}:{}:{}'.format(test.params["testidctl"], test.params["testid"], status, resulttext)) + else: + status = "TEST_FAILED" + resulttext = test.longMessage + logObj.getlogger().info('{}:{}:{}:{}'.format(test.params["testidctl"], test.params["testid"], status, resulttext)) + self.__pgObj.finish_test(test.params["testidctl"], test.params["testid"], status, resulttext) + except Exception as E: + logObj.getlogger().error('Exception: [{}]{}:{}:{}:{}'.format(E, test.params["testidctl"], test.params["testid"], status, resulttext))
\ No newline at end of file diff --git a/test-cli/test/tasks/flasheeprom.py b/test-cli/test/tasks/flasheeprom.py index feeb04c..775c556 100644 --- a/test-cli/test/tasks/flasheeprom.py +++ b/test-cli/test/tasks/flasheeprom.py @@ -2,55 +2,76 @@ import os import binascii from test.helpers.globalVariables import globalVar +class EEprom_Flasher: + __Name = None + __varlist = None + __xmlObj = None + __lastError = '' -def _generate_data_bytes(boarduuid, mac0, mac1): - data = bytearray() - data += (2029785358).to_bytes(4, 'big') # magicid --> 0x78FC110E - data += bytearray([0, 0, 0, 0]) # crc32 = 0 - data += bytearray(boarduuid + "\0", + def __init__(self, varlist): + self.__varlist = varlist + self.__xmlObj = varlist['xml'] + self.__Name = varlist.get('name_eeprom', self.__xmlObj.getKeyVal('EEProm_Flasher', 'name', 'EEProm_Flasher')) + # self.__igepflashPath = self.__xmlObj.getKeyVal('NAND_Flasher', 'igep_flash', '/usr/bin/igep-flash') + # self.__ImagePath = varlist.get('flashimagepath', '') + # self.__SkipNandtest = varlist.get('skipnandtest', self.__xmlObj.getKeyVal('NAND_Flasher', 'skipnandtest', '1')) + + def getTaskName(self): + return self.__Name + + def getError(self): + return self.__lastError + + def Execute(self): + return True, None + + def __generate_data_bytes(self, boarduuid, mac0, mac1): + data = bytearray() + data += (2029785358).to_bytes(4, 'big') # magicid --> 0x78FC110E + data += bytearray([0, 0, 0, 0]) # crc32 = 0 + data += bytearray(boarduuid + "\0", 'ascii') # uuid --> 'c0846c8a-5fa5-11ea-8576-f8b156ac62d7' and \0 at the end - data += binascii.unhexlify(mac0.replace(':', '')) # mac0 --> 'f8:b1:56:ac:62:d7' - if mac1 is not None: - data += binascii.unhexlify(mac1.replace(':', '')) # mac1 --> 'f8:b1:56:ac:62:d7' - else: - data += bytearray([0, 0, 0, 0, 0, 0]) # mac1 --> 0:0:0:0:0:0 - # calculate crc - crc = (binascii.crc32(data, 0)).to_bytes(4, 'big') - data[4:8] = crc - return data - - -def _generate_output_data(data_rx): - boarduuid = data_rx[8:44].decode('ascii') # no 8:45 porque el \0 final hace que no funcione postgresql - mac0 = binascii.hexlify(data_rx[45:51]).decode('ascii') - mac1 = binascii.hexlify(data_rx[51:57]).decode('ascii') - smac0 = ':'.join(mac0[i:i + 2] for i in range(0, len(mac0), 2)) - smac1 = ':'.join(mac1[i:i + 2] for i in range(0, len(mac1), 2)) - eepromdata = "UUID " + boarduuid + " , MAC0 " + smac0 + " , MAC1 " + smac1 - - return eepromdata - - -# returns exitcode and data saved into eeprom -def flash_eeprom(eeprompath, boarduuid, mac0, mac1=None): - print("Start programming Eeprom...") - # check if eeprompath is correct - if os.path.isfile(eeprompath): - # create u-boot data struct - data = _generate_data_bytes(boarduuid, mac0, mac1) - # write into eeprom and read back - f = open(eeprompath, "r+b") - f.write(data) - f.seek(0) - data_rx = f.read(57) - for i in range(57): - if data_rx[i] != data[i]: - print("Error while programming eeprom memory.") - return 1, None - print("Eeprom programmed succesfully.") - # generated eeprom read data - eepromdata = _generate_output_data(data_rx) - return 0, eepromdata - else: - print("Eeprom memory not found.") - return 1, None + data += binascii.unhexlify(mac0.replace(':', '')) # mac0 --> 'f8:b1:56:ac:62:d7' + if mac1 is not None: + data += binascii.unhexlify(mac1.replace(':', '')) # mac1 --> 'f8:b1:56:ac:62:d7' + else: + data += bytearray([0, 0, 0, 0, 0, 0]) # mac1 --> 0:0:0:0:0:0 + # calculate crc + crc = (binascii.crc32(data, 0)).to_bytes(4, 'big') + data[4:8] = crc + return data + + + def __generate_output_data(self, data_rx): + boarduuid = data_rx[8:44].decode('ascii') # no 8:45 porque el \0 final hace que no funcione postgresql + mac0 = binascii.hexlify(data_rx[45:51]).decode('ascii') + mac1 = binascii.hexlify(data_rx[51:57]).decode('ascii') + smac0 = ':'.join(mac0[i:i + 2] for i in range(0, len(mac0), 2)) + smac1 = ':'.join(mac1[i:i + 2] for i in range(0, len(mac1), 2)) + eepromdata = "UUID " + boarduuid + " , MAC0 " + smac0 + " , MAC1 " + smac1 + return eepromdata + + + # returns exitcode and data saved into eeprom + def flash_eeprom(self, eeprompath, boarduuid, mac0, mac1=None): + print("Start programming Eeprom...") + # check if eeprompath is correct + if os.path.isfile(eeprompath): + # create u-boot data struct + data = self.__generate_data_bytes(boarduuid, mac0, mac1) + # write into eeprom and read back + f = open(eeprompath, "r+b") + f.write(data) + f.seek(0) + data_rx = f.read(57) + for i in range(57): + if data_rx[i] != data[i]: + print("Error while programming eeprom memory.") + return 1, None + print("Eeprom programmed succesfully.") + # generated eeprom read data + eepromdata = self.__generate_output_data(data_rx) + return 0, eepromdata + else: + print("Eeprom memory not found.") + return 1, None diff --git a/test-cli/test/tasks/flashmemory.py b/test-cli/test/tasks/flashmemory.py index 3be56d7..ad9d92a 100644 --- a/test-cli/test/tasks/flashmemory.py +++ b/test-cli/test/tasks/flashmemory.py @@ -1,12 +1,44 @@ import sh +import os +class NAND_Flasher: + __Name = None + __varlist = None + __xmlObj = None + __igepflashPath = None + __lastError = '' + + def __init__(self, varlist): + self.__varlist = varlist + self.__xmlObj = varlist['xml'] + self.__Name = varlist.get('name_flashnand', self.__xmlObj.getKeyVal('NAND_Flasher', 'name', 'NAND_Flasher')) + self.__igepflashPath = self.__xmlObj.getKeyVal('NAND_Flasher', 'igep_flash', '/usr/bin/igep-flash') + self.__ImagePath = varlist.get('flashimagepath', '') + self.__SkipNandtest = varlist.get('skipnandtest', self.__xmlObj.getKeyVal('NAND_Flasher', 'skipnandtest', '1')) + + def getTaskName(self): + return self.__Name + + def getError(self): + return self.__lastError + + def Execute(self): + try: + self.__lastError = '' + if not os.path.isfile(self.__ImagePath): + self.__lastError('Not Image Found: {}'.format(self.__ImagePath)) + return False, None + if self.__SkipNandtest == '1': + p = sh.bash('{}'.format(self.__igepflashPath), "--skip-nandtest", "--image", self.__ImagePath) + else: + p = sh.bash('{}'.format(self.__igepflashPath), "--image", self.__ImagePath) + if p.exit_code != 0: + return False, None + except OSError as e: + self.__lastError('Exception: {}'.format(e.strerror)) + return False, None + except Exception as Error: + self.__lastError('Exception: {}'.format(Error)) + return False, None + return True, None -def flash_memory(imagepath): - print("Start programming Nand memory...") - p = sh.bash("/usr/bin/igep-flash", "--skip-nandtest", "--image", imagepath) - if p.exit_code != 0: - print("Flasher: Could not complete flashing task.") - return 1 - else: - print("Flasher: NAND memory flashed succesfully.") - return 0 diff --git a/test-cli/test/tests/qamper.py b/test-cli/test/tests/qamper.py index 58054c9..df340eb 100644 --- a/test-cli/test/tests/qamper.py +++ b/test-cli/test/tests/qamper.py @@ -5,6 +5,7 @@ from test.helpers.amper import Amper class Qamper(unittest.TestCase): params = None __resultlist = None # resultlist is a python list of python dictionaries + dummytest = False # varlist: undercurrent, overcurrent def __init__(self, testname, testfunc, varlist): @@ -21,8 +22,28 @@ class Qamper(unittest.TestCase): raise Exception('overcurrent param inside Qamp must be defined') self._testMethodDoc = testname self.__resultlist = [] + if "dummytest" in varlist: + self.dummytest = varlist["dummytest"] + + def saveresultinfile(self, currentvalue): + # save result in a file + with open('/mnt/station_ramdisk/amper.txt', 'w') as outfile: + n = outfile.write("Current: {} A".format(currentvalue)) + outfile.close() + self.__resultlist.append( + { + "description": "Amperimeter values", + "filepath": "/mnt/station_ramdisk/amper.txt", + "mimetype": "text/plain" + } + ) def execute(self): + if self.dummytest: + self.current = "0.0" + self.saveresultinfile(self.current) + return + amp = Amper() # open serial connection if not amp.open(): @@ -35,16 +56,7 @@ class Qamper(unittest.TestCase): # get current value (in Amperes) self.current = amp.getCurrent() # save result in a file - with open('/mnt/station_ramdisk/amper.txt', 'w') as outfile: - n = outfile.write("Current: {} A".format(self.current)) - outfile.close() - self.__resultlist.append( - { - "description": "Amperimeter values", - "filepath": "/mnt/station_ramdisk/amper.txt", - "mimetype": "text/plain" - } - ) + self.saveresultinfile(self.current) # close serial connection amp.close() # Check current range diff --git a/test-cli/test/tests/qaudio.py b/test-cli/test/tests/qaudio.py index 3d2113a..2c86809 100644 --- a/test-cli/test/tests/qaudio.py +++ b/test-cli/test/tests/qaudio.py @@ -2,56 +2,78 @@ import unittest import sh import wave import contextlib - - -def calc_audio_duration(fname): - with contextlib.closing(wave.open(fname, 'r')) as f: - frames = f.getnframes() - rate = f.getframerate() - duration = frames / float(rate) - f.close() - return duration - +import uuid +import time +from test.helpers.utils import save_result +from test.helpers.iseelogger import logObj class Qaudio(unittest.TestCase): params = None __resultlist = None # resultlist is a python list of python dictionaries __dtmf_secuence = ["1", "3", "5", "7", "9"] + __dtmf_file = None + __file_uuid = None + __QaudioName = None def __init__(self, testname, testfunc, varlist): self.params = varlist super(Qaudio, self).__init__(testfunc) self._testMethodDoc = testname self.__resultlist = [] + self.__xmlObj = varlist["xml"] + self.__QaudioName = varlist.get('name', 'qaudio') + self.__dtmf_file_play = varlist.get('play_dtmf_file', self.__xmlObj.getKeyVal(self.__QaudioName, "play_dtmf_file", "dtmf-13579.wav")) + self.__recordtime = varlist.get('recordtime', self.__xmlObj.getKeyVal(self.__QaudioName, "recordtime", "0.15")) + self.__dtmf_file_record = varlist.get('record_dtmf_file', self.__xmlObj.getKeyVal(self.__QaudioName, "record_dtmf_file", "record-{}.wav")) + self.__sampling_rate = varlist.get('rate', self.__xmlObj.getKeyVal(self.__QaudioName, "rate", "8000")) + self.__record_path = varlist.get('to', self.__xmlObj.getKeyVal(self.__QaudioName, "to", "/mnt/station_ramdisk")) + self.__run_delay = float(varlist.get('aplay_run_delay', self.__xmlObj.getKeyVal(self.__QaudioName, "aplay_run_delay", "0.0"))) + self.__file_uuid = uuid.uuid4() + self.__dtmf_file_record = self.__dtmf_file_record.format(self.__file_uuid) + + def restoreAlsaCtl(self): + try: + sh.alsactl('restore'); + except Exception as E: + logObj.getlogger().error("Failed to Execite alsactl restore"); def execute(self): - # analize audio file - recordtime = calc_audio_duration("/var/lib/hwtest-files/dtmf-13579.wav") + 0.15 + self.restoreAlsaCtl() + try: + # analize audio file + calcRecordTime = self.calc_audio_duration(self.__dtmf_file_play) + float(self.__recordtime) + float(0.1) + except TypeError as Error: + self.fail("failed: TypeError: {}.".Error) + # previous play to estabilise audio lines - p1 = sh.aplay("/var/lib/hwtest-files/dtmf-13579.wav", _bg=True) - p2 = sh.arecord("-r", 8000, "-d", recordtime, "/mnt/station_ramdisk/recorded.wav", _bg=True) - p1.wait() - p2.wait() - # play and record - p1 = sh.aplay("/var/lib/hwtest-files/dtmf-13579.wav", _bg=True) - p2 = sh.arecord("-r", 8000, "-d", recordtime, "/mnt/station_ramdisk/recorded.wav", _bg=True) - p1.wait() + p2 = sh.arecord("-r", self.__sampling_rate, "-d", calcRecordTime, "{}/{}".format(self.__record_path, self.__dtmf_file_record), _bg=True) + time.sleep(self.__run_delay) + p1 = sh.aplay(self.__dtmf_file_play) p2.wait() if p1.exit_code == 0 and p2.exit_code == 0: - p = sh.multimon("-t", "wav", "-a", "DTMF", "/mnt/station_ramdisk/recorded.wav", "-q") + p = sh.multimon("-t", "wav", "-a", "DTMF", "{}/{}".format(self.__record_path, self.__dtmf_file_record), "-q") if p.exit_code == 0: lines = p.stdout.decode('ascii').splitlines() self.__dtmf_secuence_result = [] for li in lines: if li.split(" ")[0] == "DTMF:": self.__dtmf_secuence_result.append(li.split(" ")[1]) + self.__dtmf_secuence_result = sorted(list(set(self.__dtmf_secuence_result))) # compare original and processed dtmf sequence if len(self.__dtmf_secuence) == len(self.__dtmf_secuence_result): for a, b in zip(self.__dtmf_secuence, self.__dtmf_secuence_result): if a != b: - self.fail("failed: sent and received DTMF sequence don't match.") + logObj.getlogger().debug( + "failed: sent and received DTMF sequence do not match.{} != {}".format( + self.__dtmf_secuence, self.__dtmf_secuence_result)) + self.fail("failed: sent and received DTMF sequence do not match.") + save_result(filePath='{}/{}'.format(self.__record_path, self.__dtmf_file_record), + description='sound record', + mime='audio/x-wav', + result=self.__resultlist) else: - self.fail("failed: received DTMF sequence is shorter than expected.") + logObj.getlogger().debug("received DTMF sequence is shorter than expected.{} {}".format(self.__dtmf_secuence,self.__dtmf_secuence_result)) + self.fail("received DTMF sequence is shorter than expected") else: self.fail("failed: unable to use multimon command.") else: @@ -62,3 +84,12 @@ class Qaudio(unittest.TestCase): def gettextresult(self): return "" + + def calc_audio_duration(self, fname): + duration = 0.0 + with contextlib.closing(wave.open(fname, 'r')) as f: + frames = f.getnframes() + rate = f.getframerate() + duration = frames / float(rate) + f.close() + return duration diff --git a/test-cli/test/tests/qdmesg.py b/test-cli/test/tests/qdmesg.py index b35f1ff..39a5047 100644 --- a/test-cli/test/tests/qdmesg.py +++ b/test-cli/test/tests/qdmesg.py @@ -1,4 +1,5 @@ import sh +import os import os.path from os import path @@ -9,24 +10,23 @@ class Qdmesg: def __init__(self, testname, testfunc, varlist): self.params = varlist - self._testMethodDoc = testname + self.__testMethodDoc = testname self.__resultlist = [] self.pgObj = varlist["db"] + self.__xmlObj = varlist["xml"] + self.__QdmesgName = varlist.get('name', 'qdmesg') + self.__syslog_dmesg_file = varlist.get('syslog_dmesg',self.__xmlObj.getKeyVal(self.__QdmesgName, "syslog_dmesg", + "/var/log/kern.log")) + + def getTestName(self): + return self.__testMethodDoc def execute(self): - print("running dmesg test") self.pgObj.run_test(self.params["testidctl"], self.params["testid"]) - # delete previous file - if path.exists("/mnt/station_ramdisk/dmesg.txt"): - os.remove("/mnt/station_ramdisk/dmesg.txt") - # generate file - p = sh.dmesg("--color=never", _out="/mnt/station_ramdisk/dmesg.txt") - if p.exit_code == 0: - # save dmesg result in DB - self.pgObj.upload_result_file(self.params["testidctl"], self.params["testid"], "dmesg output", - "/mnt/station_ramdisk/dmesg.txt", "text/plain") - self.pgObj.finish_test(self.params["testidctl"], self.params["testid"], "TEST_COMPLETE", "") - print("success dmesg test") - else: + if not os.path.isfile('{}'.format(self.__syslog_dmesg_file)): self.pgObj.finish_test(self.params["testidctl"], self.params["testid"], "TEST_FAILED", "") - print("fail dmesg test") + return False + self.pgObj.upload_result_file(self.params["testidctl"], self.params["testid"], "{}".format(self.__syslog_dmesg_file), + "{}".format(self.__syslog_dmesg_file), "text/plain") + self.pgObj.finish_test(self.params["testidctl"], self.params["testid"], "TEST_COMPLETE", "") + return True diff --git a/test-cli/test/tests/qeeprom.py b/test-cli/test/tests/qeeprom.py index 7fc9fcd..7900381 100644 --- a/test-cli/test/tests/qeeprom.py +++ b/test-cli/test/tests/qeeprom.py @@ -1,46 +1,65 @@ -import sh import unittest - +import os class Qeeprom(unittest.TestCase): params = None - __position = None - __eeprompath = None __resultlist = None # resultlist is a python list of python dictionaries + __QEepromName = None + __i2cBus = None + __device = None # varlist: position, eeprompath def __init__(self, testname, testfunc, varlist): self.params = varlist super(Qeeprom, self).__init__(testfunc) self._testMethodDoc = testname + self.__xmlObj = varlist["xml"] + self.__QEepromName = varlist.get('name', 'qeeprom') + self.__i2cAddress = varlist.get('i2c_address', self.__xmlObj.getKeyVal(self.__QEepromName, "i2c_address", "0050")) + self.__i2cBus = varlist.get('i2c_bus', self.__xmlObj.getKeyVal(self.__QEepromName, "i2c_bus", "0")) + self.__device = '/sys/bus/i2c/devices/{}-{}/eeprom'.format(self.__i2cBus, self.__i2cAddress) + self.__resultlist = [] - if "position" in varlist: - self.__position = varlist["position"] - else: - raise Exception('position param inside Qeeprom must be defined') - if "eeprompath" in varlist: - self.__eeprompath = varlist["eeprompath"] - else: - raise Exception('eeprompath param inside Qeeprom must be defined') + + def __check_device(self): + if os.path.isfile(self.__device): + return True + return False + + def __eeprom_write(self, data): + try: + f = open(self.__device, "wb") + f.write(data) + f.close() + except IOError as Error: + return False, '{}'.format(Error.errno) + return True, '' + + def __eeprom_read(self, size): + try: + f = open(self.__device, "rb") + data = f.read(size) + f.close() + except IOError as Error: + return False, '{}'.format(Error.errno) + return True, data + def execute(self): - # generate some data - data_tx = "isee_test" - # write data into the eeprom - p = sh.dd(sh.echo(data_tx), "of=" + self.__eeprompath, "bs=1", "seek=" + self.__position) - if p.exit_code == 0: - # read data from the eeprom - p = sh.dd("if=" + self.__eeprompath, "bs=1", "skip=" + self.__position, "count=" + str(len(data_tx))) - if p.exit_code == 0: - data_rx = p.stdout.decode('ascii') - # compare both values - if data_rx != data_tx: - # Both data are different - self.fail("failed: mismatch between written and received values.") - else: - self.fail("failed: Unable to read from the EEPROM device.") - else: - self.fail("failed: Unable to write on the EEPROM device.") + if not self.__check_device(): + self.fail("cannot open device {}".format(self.__device)) + + data = bytes('AbcDeFgHijK098521', 'utf-8') + + v, w = self.__eeprom_write(data) + if not v: + self.fail("eeprom: {} write test failed".format(w)) + v, r = self.__eeprom_read(len(data)) + if not v: + self.fail("eeprom: {} read test failed".format(r)) + if r != data: + self.fail("mismatch between write and read bytes") + def getresults(self): return self.__resultlist diff --git a/test-cli/test/tests/qethernet.py b/test-cli/test/tests/qethernet.py index 81acef1..7d081a1 100644 --- a/test-cli/test/tests/qethernet.py +++ b/test-cli/test/tests/qethernet.py @@ -2,7 +2,11 @@ import unittest import sh import json import time - +import uuid +import iperf3 +import netifaces +from test.helpers.utils import save_json_to_disk +from test.helpers.utils import station2Port class Qethernet(unittest.TestCase): __serverip = None @@ -16,29 +20,66 @@ class Qethernet(unittest.TestCase): # varlist content: serverip, bwexpected, port def __init__(self, testname, testfunc, varlist): - # save the parameters list self.params = varlist - # configure the function to be executed when the test runs. "testfunc" is a name of a method inside this - # class, that in this situation, it can only be "execute". super(Qethernet, self).__init__(testfunc) - # validate and get the parameters - if "serverip" in varlist: - self.__serverip = varlist["serverip"] - else: - raise Exception('sip param inside Qethernet have been be defined') - if "bwexpected" in varlist: - self.__bwexpected = varlist["bwexpected"] - else: - raise Exception('bwexpected param inside Qethernet must be defined') - if "port" in varlist: - self.__port = varlist["port"] - else: - raise Exception('port param inside Qethernet must be defined') - self.__numbytestx = "10M" self._testMethodDoc = testname + self.__xmlObj = varlist["xml"] + self.__QEthName = varlist.get('name', 'qeth') + self.__loops = varlist.get('loops', self.__xmlObj.getKeyVal(self.__QEthName, "loops", "1")) + self.__port = varlist.get('port', self.__xmlObj.getKeyVal(self.__QEthName, "port", "5000")) + self.__bw = varlist.get('bwexpected', self.__xmlObj.getKeyVal(self.__QEthName, "bwexpected", "40.0")) + self.__serverip = varlist.get('serverip', self.__xmlObj.getKeyVal(self.__QEthName, "serverip", "localhost")) + self.__duration = varlist.get('duration', self.__xmlObj.getKeyVal(self.__QEthName, "duration", "10")) + self.__interface = varlist.get('interface', self.__xmlObj.getKeyVal(self.__QEthName, "interface", "eth0")) + self.__toPath = varlist.get('to', self.__xmlObj.getKeyVal(self.__QEthName, "to", "/mnt/station_ramdisk")) + self.__retriesCount = varlist.get('retries', self.__xmlObj.getKeyVal(self.__QEthName, "retries", "5")) + self.__waitRetryTime = varlist.get('wait_retry', self.__xmlObj.getKeyVal(self.__QEthName, "wait_retry", "10")) + self.__wifi_res_file = varlist.get('eth_res_file', self.__xmlObj.getKeyVal(self.__QEthName, "eth_res_file", "eth_test_{}.json")) + self.__wifi_st_file = varlist.get('eth_st_file', self.__xmlObj.getKeyVal(self.__QEthName, "eth_st_file", "eth_st_{}.json")) + self.__file_uuid = uuid.uuid4() + self.__wifi_res_file = self.__wifi_res_file.format(self.__file_uuid) + self.__wifi_st_file = self.__wifi_st_file.format(self.__file_uuid) + self.__resultlist = [] + def checkInterface(self, reqInterface): + ifaces = netifaces.interfaces() + for t in ifaces: + if t == reqInterface: + return True + return False + def execute(self): + self.__resultlist = [] + # check interfaces + if not self.checkInterface(self.__interface): + self.fail('Interface not found: {}'.format(self.__interface)) + + # Start Iperf + client = iperf3.Client() + client.duration = int(self.__duration) + client.server_hostname = self.__serverip + client.port = station2Port(self.__port) + count = int(self.__retriesCount) + while count > 0: + result = client.run() + if result.error == None: + if result.received_Mbps >= float(self.__bw) and result.sent_Mbps >= float(self.__bw): + save_json_to_disk(filePath='{}/{}'.format(self.__toPath, self.__wifi_res_file), + description='eth {} iperf3'.format(self.__interface), + mime='application/json', + json_data=result.json, + result=self.__resultlist) + return + else: + self.fail('bw fail: rx:{} tx:{}'.format(result.received_Mbps, result.sent_Mbps)) + else: + count = count - 1 + time.sleep(int(self.__waitRetryTime)) + + self.fail('qEth (max tries) Execution error: {}'.format(result.error)) + + def execute2(self): # execute iperf command against the server, but it implements attempts in case the server is busy iperfdone = False while not iperfdone: diff --git a/test-cli/test/tests/qmmcflash.py b/test-cli/test/tests/qmmcflash.py index f10757e..0f5a0c1 100644 --- a/test-cli/test/tests/qmmcflash.py +++ b/test-cli/test/tests/qmmcflash.py @@ -1,5 +1,10 @@ import unittest -import sh +import scanf +from scanf import scanf +from sh import mmc +from sh import ErrorReturnCode +from test.helpers.utils import save_file_to_disk +from test.helpers.utils import sys_read class Qmmcflash(unittest.TestCase): params = None @@ -8,8 +13,43 @@ class Qmmcflash(unittest.TestCase): self.params = varlist super(Qmmcflash, self).__init__(testfunc) self._testMethodDoc = testname - # TODO + self.__xmlObj = varlist["xml"] + self.__QeMMCName = varlist.get('name', 'qemmc') + self.__emmcDevice = varlist.get('device', self.__xmlObj.getKeyVal(self.__QeMMCName, "device", "mmcblk0")) + self.__toPath = varlist.get('to', self.__xmlObj.getKeyVal(self.__QeMMCName, "to", "/mnt/station_ramdisk")) + self.__mmc_res_file = varlist.get('emmc_res_file', self.__xmlObj.getKeyVal(self.__QeMMCName, "emmc_res_file", "emmc_status.txt")) + self.__mmcPort = varlist.get('mmc_port', self.__xmlObj.getKeyVal(self.__QeMMCName, "mmc_port", "0")) + self.__mmcID = varlist.get('mmc_id', self.__xmlObj.getKeyVal(self.__QeMMCName, "mmc_id", "0001")) + + self.__resultlist = [] def execute(self): - # TODO
\ No newline at end of file + try: + dataOut = mmc('extcsd', 'read', '/dev/{}'.format(self.__emmcDevice)) + save_file_to_disk(filePath='{}/{}'.format(self.__toPath, self.__mmc_res_file), + description='eMMC health test', + mime='text/plain', + data=dataOut.stdout.decode('utf-8'), + result=self.__resultlist) + sysDevice = "/sys/bus/mmc/drivers/mmcblk/mmc{}:{}".format(self.__mmcPort, self.__mmcID) + r, data = sys_read("{}/life_time".format(sysDevice)) + if not r: + self.fail("emmc: life_time not found") + res = scanf("0x%d 0x%d", data) + if res[0] > 3 or res[1] > 3: + self.fail("emmc: review {} life_time > 3".format(sysDevice)) + r, data = sys_read("{}/pre_eol_info".format(sysDevice)) + if not r: + self.fail("emmc: pre_eol_info not found") + res = scanf("0x%d", data) + if res[0] != 1: + self.fail("emmc: review {} pre_eol_info != 1".format(sysDevice)) + except ErrorReturnCode as Error: + self.fail("emmc: failed {} ".format(Error.exit_code)) + + def getresults(self): + return self.__resultlist + + def gettextresult(self): + return ""
\ No newline at end of file diff --git a/test-cli/test/tests/qnand.py b/test-cli/test/tests/qnand.py index 988ab60..6de1eaf 100644 --- a/test-cli/test/tests/qnand.py +++ b/test-cli/test/tests/qnand.py @@ -1,6 +1,7 @@ import unittest import sh - +import uuid +from test.helpers.utils import save_file_to_disk class Qnand(unittest.TestCase): params = None @@ -11,29 +12,34 @@ class Qnand(unittest.TestCase): def __init__(self, testname, testfunc, varlist): self.params = varlist super(Qnand, self).__init__(testfunc) - if "device" in varlist: - self.__device = varlist["device"] - else: - raise Exception('device param inside Qnand must be defined') self._testMethodDoc = testname + self.__xmlObj = varlist["xml"] + self.__QNandName = varlist.get('name', 'qnand') + self.__device = varlist.get('device', self.__xmlObj.getKeyVal(self.__QNandName, "device", "/dev/mtd0")) + self.__nand_res_file = varlist.get('nand_res_file', self.__xmlObj.getKeyVal(self.__QNandName, "nand_res_file", "nand_test_{}.txt")) + self.__toPath = varlist.get('to', self.__xmlObj.getKeyVal(self.__QNandName, "to", "/mnt/station_ramdisk")) + self.__file_uuid = uuid.uuid4() + self.__nand_res_file = self.__nand_res_file.format(self.__file_uuid) + nandtestPath = self.__xmlObj.getKeyVal("qnand", "nandtestPath", '/root/hwtest-files/nandtest"') + + if nandtestPath == '': + self.myNandTest = sh.nandtest + else: + self.myNandTest = sh.Command(nandtestPath) self.__resultlist = [] def execute(self): try: - p = sh.nandtest("-m", self.__device) - # save result - with open('/mnt/station_ramdisk/nand-nandtest.txt', 'w') as outfile: - n = outfile.write(p.stdout.decode('ascii')) - outfile.close() - self.__resultlist.append( - { - "description": "nandtest output", - "filepath": "/mnt/station_ramdisk/nand-nandtest.txt", - "mimetype": "text/plain" - } - ) - except sh.ErrorReturnCode as e: - self.fail("failed: could not complete nandtest command") + res = self.myNandTest("-m", self.__device) + save_file_to_disk(filePath='{}/{}'.format(self.__toPath, self.__nand_res_file), + description='nand {} test'.format(self.__device), + mime='text/plain', + data=res.stdout.decode('utf-8'), + result=self.__resultlist) + + except sh.ErrorReturnCode_1 as e: + self.fail("nandtest failed: {}".format(e.ErrorReturnCode.stdout)) + def getresults(self): return self.__resultlist diff --git a/test-cli/test/tests/qplc.py b/test-cli/test/tests/qplc.py new file mode 100644 index 0000000..01258a9 --- /dev/null +++ b/test-cli/test/tests/qplc.py @@ -0,0 +1,70 @@ +import shutil +import unittest +import os.path +import os +import sh + +from test.helpers.md5 import md5_file +from test.helpers.plc import dcpPLC + +class Qplc(unittest.TestCase): + params = None + __resultlist = None # resultlist is a python list of python dictionaries + __QPLCName = None + __plc = None + + def __init__(self, testname, testfunc, varlist): + self.params = varlist + super(Qplc, self).__init__(testfunc) + self._testMethodDoc = testname + self.__xmlObj = varlist["xml"] + + self.__QPLCName = varlist.get('name', 'qplc') + self.__firmware = varlist.get('firmware', self.__xmlObj.getKeyVal(self.__QPLCName, "firmware", "")) + self.__firmware_md5 = varlist.get('firmware_md5', self.__xmlObj.getKeyVal(self.__QPLCName, "firmware_md5", "")) + self.__factoryTool = varlist.get('factory_tool', self.__xmlObj.getKeyVal(self.__QPLCName, "factory_tool", "")) + self.__gen_ip = varlist.get('gen_ip', self.__xmlObj.getKeyVal(self.__QPLCName, "gen_ip", "0")) + self.__gen_mac = varlist.get('gen_mac', self.__xmlObj.getKeyVal(self.__QPLCName, "gen_mac", "0")) + self.__mtd_device = varlist.get('mtd_device', self.__xmlObj.getKeyVal(self.__QPLCName, "mtd_device", "/dev/mtd0")) + self.__plc = dcpPLC(self.__factoryTool, self.__mtd_device) + + self.__resultlist = [] + + + def checkfiles(self): + if not os.path.isfile('{}/{}'.format(self.__filepath_from, self.__file_source)): + return False, 'File test binary Not found {}/{}'.format(self.__filepath_from, self.__file_source) + if not os.path.isfile('{}/{}'.format(self.__filepath_from, self.__file_md5)): + return False, 'File test binary md5 Not found {}/{}'.format(self.__filepath_from, self.__file_md5) + if self.__check_mounted != '': + if not os.path.ismount('{}'.format(self.__check_mounted)): + return False, 'Required mounted failed: {}'.format(self.__path_to) + r, s = self.removefileIfExist('{}/{}'.format(self.__path_to, self.__file_source)) + if not r: + return r, s + return True, '' + + def __flashMemory(self): + self.__plc.setSaveFirmwareMode() + r, v = self.__plc.SaveFirmware(self.__firmware) + if not r: + self.fail(v) + + def execute(self): + # Check files + v, m = self.checkfiles() + if not v: + self.fail(m) + # do flash & verify + self.__flashMemory() + # do Plc boot + self.__plc.setBootMode() + # Wait plc goes UP + + + + def getresults(self): + return self.__resultlist + + def gettextresult(self): + return "" diff --git a/test-cli/test/tests/qram.py b/test-cli/test/tests/qram.py index 21a01c8..49d4d54 100644 --- a/test-cli/test/tests/qram.py +++ b/test-cli/test/tests/qram.py @@ -1,33 +1,59 @@ import unittest import sh - +# from test.helpers.iseelogger import MeasureTime +# from test.helpers.iseelogger import logObj +from test.helpers.utils import save_file_to_disk class Qram(unittest.TestCase): params = None - __memsize = "10M" - __loops = "1" - __resultlist = None # resultlist is a python list of python dictionaries + __memsize = None + __loops = None + __resultlist = [] # resultlist is a python list of python dictionaries # varlist: memsize, loops def __init__(self, testname, testfunc, varlist): self.params = varlist super(Qram, self).__init__(testfunc) - if "memsize" in varlist: - self.__memsize = varlist["memsize"] - else: - raise Exception('memsize param inside Qram must be defined') - if "loops" in varlist: - self.__loops = varlist["loops"] - else: - raise Exception('loops param inside Qram must be defined') self._testMethodDoc = testname - self.__resultlist = [] + self.__xmlObj = varlist["xml"] + + self.__QramName = varlist.get('name', 'qram') + self.__loops = varlist.get('loops', self.__xmlObj.getKeyVal(self.__QramName, "loops", "1")) + self.__memsize = varlist.get('memsize', self.__xmlObj.getKeyVal(self.__QramName, "memsize", "50M")) + self.__toPath = varlist.get('to', self.__xmlObj.getKeyVal(self.__QramName, "to", "/mnt/station_ramdisk")) + self.__ram_res_file = varlist.get('ram_res_file', self.__xmlObj.getKeyVal(self.__QramName, "ram_res_file", "ram_res.txt")) + + self.__dummytest = varlist.get('dummytest', self.__xmlObj.getKeyVal(self.__QramName, "dummytest", "0")) + self.__dummyresult = varlist.get('dummytestresult', self.__xmlObj.getKeyVal(self.__QramName, "dummytest", "0")) + + memtesterPath = self.__xmlObj.getKeyVal(self.__QramName, "memtesterPath", '') + + if memtesterPath == '': + self.myMemtester = sh.memtester + else: + self.myMemtester = sh.Command(memtesterPath) def execute(self): + self.__resultlist = [] try: - p = sh.memtester(self.__memsize, "1", _out="/dev/null") + # mytime = MeasureTime() + res = self.myMemtester(self.__memsize, '{}'.format(self.__loops)) + save_file_to_disk(filePath='{}/{}'.format(self.__toPath, self.__ram_res_file), + description='Ram result test', + mime='text/plain', + data=res.stdout.decode('utf-8'), + result=self.__resultlist) + # mytime.stop() except sh.ErrorReturnCode as e: - self.fail("failed: could not complete memtester command") + save_file_to_disk(filePath='{}/{}'.format(self.__toPath, self.__ram_res_file), + description='Ram result test', + mime='text/plain', + data=e.stdout.decode('utf-8'), + result=self.__resultlist) + self.fail("failed: memtester {}::{}".format(str(e.exit_code), e.stdout.decode('utf-8'))) + + except Exception as details: + self.fail('Error: {}'.format(details)) def getresults(self): return self.__resultlist diff --git a/test-cli/test/tests/qusb.py b/test-cli/test/tests/qusb.py index 6302012..3182685 100644 --- a/test-cli/test/tests/qusb.py +++ b/test-cli/test/tests/qusb.py @@ -1,91 +1,90 @@ -import sh +import shutil import unittest -from test.helpers.usb import USBDevices import os.path +import os +import sh +from test.helpers.md5 import md5_file class Qusb(unittest.TestCase): params = None __resultlist = None # resultlist is a python list of python dictionaries + __QusbName = None def __init__(self, testname, testfunc, varlist): self.params = varlist super(Qusb, self).__init__(testfunc) self._testMethodDoc = testname - if "repetitions" in varlist: - self.__repetitions = varlist["repetitions"] - else: - raise Exception('repetitions param inside Qusb must be defined') + self.__xmlObj = varlist["xml"] + + self.__QusbName = varlist.get('name', 'qusb') + self.__loops = varlist.get('loops', self.__xmlObj.getKeyVal(self.__QusbName, "loops", "3")) + self.__filepath_from = varlist.get('path', self.__xmlObj.getKeyVal(self.__QusbName, "path", "/root/hwtest-files")) + self.__file_source = varlist.get('file', self.__xmlObj.getKeyVal(self.__QusbName, "file", "usb-test.bin")) + self.__file_md5 = varlist.get('file_md5', self.__xmlObj.getKeyVal(self.__QusbName, "file_md5", "usb-test.bin.md5")) + self.__path_to = varlist.get('pathto', self.__xmlObj.getKeyVal(self.__QusbName, "pathto", "/mnt/test/disk2")) + self.__check_mounted = varlist.get('check_mounted', self.__xmlObj.getKeyVal(self.__QusbName, "check_mounted", "")) + self.__resultlist = [] - def execute(self): - # get usb device - dev_obj = USBDevices(self.params["xml"]) - if dev_obj.getMassStorage(): - device = dev_obj.getMassStorage()['disk'] + "1" - else: - self.fail("failed: No USB memory found.") - # check if the device is mounted, and umount it - try: - p = sh.findmnt("-n", device) - if p.exit_code == 0: - sh.umount(device) - except sh.ErrorReturnCode as e: - # error = 1 means "no found" - pass - # mount the device - sh.mkdir("-p", "/mnt/station_ramdisk/pendrive") - p = sh.mount(device, "/mnt/station_ramdisk/pendrive") - if p.exit_code != 0: - self.fail("failed: Unable to mount the USB memory device.") - # execute test - for i in range(int(self.__repetitions)): - # copy files + def removefileIfExist(self, fname): + if os.path.exists(fname): try: - p = sh.cp("/var/lib/hwtest-files/usbdatatest.bin", - "/var/lib/hwtest-files/usbdatatest.bin.md5", - "/mnt/station_ramdisk/pendrive") - except sh.ErrorReturnCode as e: - try: - sh.umount("/mnt/station_ramdisk/pendrive") - except sh.ErrorReturnCode: - pass - sh.rmdir("/mnt/station_ramdisk/pendrive") - self.fail("failed: Unable to copy files to the USB memory device.") - # check if the device is still mounted - if not os.path.ismount("/mnt/station_ramdisk/pendrive"): - sh.rm("/mnt/station_ramdisk/pendrive/*") - sh.rmdir("/mnt/station_ramdisk/pendrive") - self.fail("failed: USB device unmounted during/after copying files.") - # check MD5 - try: - p = sh.md5sum("/mnt/station_ramdisk/pendrive/usbdatatest.bin") - except sh.ErrorReturnCode as e: - try: - sh.umount("/mnt/station_ramdisk/pendrive") - except sh.ErrorReturnCode: - pass - sh.rmdir("/mnt/station_ramdisk/pendrive") - self.fail("failed: Unable to calculate MD5 of the copied file.") - newmd5 = p.stdout.decode().split(" ")[0] - with open('/mnt/station_ramdisk/pendrive/usbdatatest.bin.md5', 'r') as outfile: - oldmd5 = outfile.read().rstrip("\n") - outfile.close() - if newmd5 != oldmd5: - try: - sh.umount("/mnt/station_ramdisk/pendrive") - except sh.ErrorReturnCode: - pass - sh.rmdir("/mnt/station_ramdisk/pendrive") - self.fail("failed: MD5 check failed.") - # delete copied files - sh.rm("-f", "/mnt/station_ramdisk/pendrive/usbdatatest.bin", "/mnt/station_ramdisk/pendrive/usbdatatest.bin.md5") - # Finish + os.remove(fname) + except OSError as error: + return False, str(error) + return True, '' + + def checkfiles(self): + if not os.path.isfile('{}/{}'.format(self.__filepath_from, self.__file_source)): + return False, 'File test binary Not found {}/{}'.format(self.__filepath_from, self.__file_source) + if not os.path.isfile('{}/{}'.format(self.__filepath_from, self.__file_md5)): + return False, 'File test binary md5 Not found {}/{}'.format(self.__filepath_from, self.__file_md5) + if self.__check_mounted != '': + if not os.path.ismount('{}'.format(self.__check_mounted)): + return False, 'Required mounted failed: {}'.format(self.__path_to) + r, s = self.removefileIfExist('{}/{}'.format(self.__path_to, self.__file_source)) + if not r: + return r, s + return True, '' + + def getTestFile_md5(self, fname): + t = '' + with open('{}'.format(fname), 'r') as outfile: + t = outfile.read().rstrip("\n") + outfile.close() + return t + + def ExecuteTranfer(self): try: - sh.umount("/mnt/station_ramdisk/pendrive") - except sh.ErrorReturnCode: - pass - sh.rmdir("/mnt/station_ramdisk/pendrive") + originalMD5 = self.getTestFile_md5('{}/{}'.format(self.__filepath_from, self.__file_md5)) + if originalMD5 == '': + self.fail('MD5 file {}/{} Not contain any md5'.format(self.__filepath_from, self.__file_md5)) + # shutil.copy2('{}/{}'.format(self.__filepath_from, self.__file_source), '{}'.format(self.__path_to)) + sh.cp('{}/{}'.format(self.__filepath_from, self.__file_source), '{}'.format(self.__path_to)) + sh.sync() + md5val = md5_file('{}/{}'.format(self.__path_to, self.__file_source)) + self.removefileIfExist('{}/{}'.format(self.__path_to, self.__file_source)) + if originalMD5 != md5val: + return False, 'md5 check failed {}<>{}'.format(originalMD5, md5val) + except OSError as why: + return False,'Error: {} tranfer failed'.format(why.errno) + except Exception as details: + return False, 'USB Exception: {} tranfer failed'.format(details) + return True, '' + + def execute(self): + v, m = self.checkfiles() + if not v: + self.fail(m) + countLoops = int(self.__loops) + while countLoops > 0: + res, msg = self.ExecuteTranfer() + if not res: + res, msg = self.ExecuteTranfer() + if not res: + self.fail(msg) + countLoops = countLoops - 1 def getresults(self): return self.__resultlist diff --git a/test-cli/test/tests/qusbdual.py b/test-cli/test/tests/qusbdual.py deleted file mode 100644 index 05b22c3..0000000 --- a/test-cli/test/tests/qusbdual.py +++ /dev/null @@ -1,90 +0,0 @@ -import sh -import unittest -import os.path -import time - - -class Qusbdual(unittest.TestCase): - params = None - __resultlist = None # resultlist is a python list of python dictionaries - - def __init__(self, testname, testfunc, varlist): - self.params = varlist - super(Qusbdual, self).__init__(testfunc) - self._testMethodDoc = testname - if "repetitions" in varlist: - self.__repetitions = varlist["repetitions"] - else: - raise Exception('repetitions param inside Qusbdual must be defined') - self.__resultlist = [] - - def execute(self): - # check if file-as-filesystem exists - if not os.path.isfile('/var/lib/hwtest-files/mass-otg-test.img'): - self.fail("failed: Unable to find file-as-filesystem image.") - # generate mass storage gadget - p = sh.modprobe("g_mass_storage", "file=/var/lib/hwtest-files/mass-otg-test.img") - if p.exit_code != 0: - self.fail("failed: Unable to create a mass storage gadget.") - # wait to detect the new device - time.sleep(3) - # find the mass storage device - try: - p = sh.grep(sh.lsblk("-So", "NAME,VENDOR"), "Linux") - except sh.ErrorReturnCode: - sh.modprobe("-r", "g_mass_storage") - self.fail("failed: could not find any mass storage gadget") - device = p.stdout.decode().split(" ")[0] - # mount the mass storage gadget - sh.mkdir("-p", "/mnt/station_ramdisk/hdd_gadget") - p = sh.mount("-o", "ro", "/dev/" + device, "/mnt/station_ramdisk/hdd_gadget") - if p.exit_code != 0: - sh.modprobe("-r", "g_mass_storage") - self.fail("failed: Unable to mount the mass storage gadget.") - # execute test - for i in range(int(self.__repetitions)): - # copy files - try: - p = sh.cp("/mnt/station_ramdisk/hdd_gadget/usb-test.bin", - "/mnt/station_ramdisk/hdd_gadget/usb-test.bin.md5", - "/mnt/station_nfsdisk/") - except sh.ErrorReturnCode as e: - sh.umount("/mnt/station_ramdisk/hdd_gadget") - sh.rmdir("/mnt/station_ramdisk/hdd_gadget") - sh.modprobe("-r", "g_mass_storage") - self.fail("failed: Unable to copy files through USB.") - # check if the device is still mounted - if not os.path.ismount("/mnt/station_ramdisk/hdd_gadget"): - sh.rm("/mnt/station_ramdisk/hdd_gadget/*") - sh.rmdir("/mnt/station_ramdisk/hdd_gadget") - sh.modprobe("-r", "g_mass_storage") - self.fail("failed: USB device unmounted during/after copying files.") - # Check md5 - try: - p = sh.md5sum("/mnt/station_nfsdisk/usb-test.bin") - except sh.ErrorReturnCode as e: - sh.umount("/mnt/station_ramdisk/hdd_gadget") - sh.rmdir("/mnt/station_ramdisk/hdd_gadget") - sh.modprobe("-r", "g_mass_storage") - self.fail("failed: Unable to calculate MD5 of the copied file.") - newmd5 = p.stdout.decode().split(" ")[0] - with open('/mnt/station_ramdisk/hdd_gadget/usb-test.bin.md5', 'r') as outfile: - oldmd5 = outfile.read().rstrip("\n") - outfile.close() - if newmd5 != oldmd5: - sh.umount("/mnt/station_ramdisk/hdd_gadget") - sh.rmdir("/mnt/station_ramdisk/hdd_gadget") - sh.modprobe("-r", "g_mass_storage") - self.fail("failed: MD5 check failed.") - # delete copied files - sh.rm("-f", "/mnt/station_nfsdisk/usb-test.bin", "/mnt/station_nfsdisk/usb-test.bin.md5") - # Finish - sh.umount("/mnt/station_ramdisk/hdd_gadget") - sh.rmdir("/mnt/station_ramdisk/hdd_gadget") - sh.modprobe("-r", "g_mass_storage") - - def getresults(self): - return self.__resultlist - - def gettextresult(self): - return "" diff --git a/test-cli/test/tests/qwifi.py b/test-cli/test/tests/qwifi.py index 3dd9e38..4695041 100644 --- a/test-cli/test/tests/qwifi.py +++ b/test-cli/test/tests/qwifi.py @@ -1,9 +1,11 @@ import unittest -import sh -import re -import json import time - +import uuid +import iperf3 +import netifaces +from test.helpers.iw import iwScan +from test.helpers.utils import save_json_to_disk +from test.helpers.utils import station2Port class Qwifi(unittest.TestCase): __serverip = None @@ -19,81 +21,76 @@ class Qwifi(unittest.TestCase): def __init__(self, testname, testfunc, varlist): self.params = varlist super(Qwifi, self).__init__(testfunc) - if "serverip" in varlist: - self.__serverip = varlist["serverip"] - else: - raise Exception('serverip param inside Qwifi have been be defined') - if "bwexpected" in varlist: - self.__bwexpected = varlist["bwexpected"] - else: - raise Exception('OKBW param inside Qwifi must be defined') - if "port" in varlist: - self.__port = varlist["port"] - else: - raise Exception('port param inside Qwifi must be defined') - self.__numbytestx = "10M" self._testMethodDoc = testname + self.__xmlObj = varlist["xml"] + self.__QwifiName = varlist.get('name', 'qwifi') + self.__loops = varlist.get('loops', self.__xmlObj.getKeyVal(self.__QwifiName, "loops", "1")) + self.__port = varlist.get('port', self.__xmlObj.getKeyVal(self.__QwifiName, "port", "5000")) + self.__bw = varlist.get('bwexpected', self.__xmlObj.getKeyVal(self.__QwifiName, "bwexpected", "5.0")) + self.__serverip = varlist.get('serverip', self.__xmlObj.getKeyVal(self.__QwifiName, "serverip", "localhost")) + self.__duration = varlist.get('duration', self.__xmlObj.getKeyVal(self.__QwifiName, "duration", "10")) + self.__interface = varlist.get('interface', self.__xmlObj.getKeyVal(self.__QwifiName, "interface", "wlan0")) + self.__toPath = varlist.get('to', self.__xmlObj.getKeyVal(self.__QwifiName, "to", "/mnt/station_ramdisk")) + self.__retriesCount = varlist.get('retries', self.__xmlObj.getKeyVal(self.__QwifiName, "retries", "5")) + self.__waitRetryTime = varlist.get('wait_retry', self.__xmlObj.getKeyVal(self.__QwifiName, "wait_retry", "10")) + self.__wifi_res_file = varlist.get('wifi_res_file', self.__xmlObj.getKeyVal(self.__QwifiName, "wifi_res_file", "wifi_test_{}.json")) + self.__wifi_st_file = varlist.get('wifi_st_file', self.__xmlObj.getKeyVal(self.__QwifiName, "wifi_st_file", "wifi_st_{}.json")) + self.__file_uuid = uuid.uuid4() + self.__wifi_res_file = self.__wifi_res_file.format(self.__file_uuid) + self.__wifi_st_file = self.__wifi_st_file.format(self.__file_uuid) self.__resultlist = [] - def execute(self): - # check if the board is connected to the router by wifi - p = sh.iw("wlan0", "link") - if p.exit_code == 0: - # get the first line of the output stream - out1 = p.stdout.decode('ascii').splitlines()[0] - if out1 != "Not connected.": - # check if the board has ip in the wlan0 interface - p = sh.ifconfig("wlan0") - if p.exit_code == 0: - # check if wlan0 has an IP - result = re.search( - 'inet addr:(?!127\.0{1,3}\.0{1,3}\.0{0,2}1$)((25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.){3}(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)', - p.stdout.decode('ascii')) - if result: - # execute iperf command against the server, but it implements attempts in case the server is busy - iperfdone = False - while not iperfdone: - try: - p = sh.iperf3("-c", self.__serverip, "-n", self.__numbytestx, "-f", "m", "-p", self.__port, - "-J", _timeout=20) - iperfdone = True - except sh.TimeoutException: - self.fail("failed: iperf timeout reached") - except sh.ErrorReturnCode: - time.sleep(self.timebetweenattempts) - - # check if it was executed succesfully - if p.exit_code == 0: - if p.stdout == "": - self.fail("failed: error executing iperf command") - # analyze output string - data = json.loads(p.stdout.decode('ascii')) - self.__bwreal = float(data['end']['sum_received']['bits_per_second']) / 1024 / 1024 - # save result file - with open('/mnt/station_ramdisk/wifi-iperf3.json', 'w') as outfile: - json.dump(data, outfile, indent=4) - outfile.close() - self.__resultlist.append( - { - "description": "iperf3 output", - "filepath": "/mnt/station_ramdisk/wifi-iperf3.json", - "mimetype": "application/json" - } - ) + def checkInterface(self, reqInterface): + ifaces = netifaces.interfaces() + for t in ifaces: + if t == reqInterface: + return True + return False - # check if BW is in the expected range - if self.__bwreal < float(self.__bwexpected): - self.fail("failed: speed is lower than expected. Speed(Mbits/s): " + str(self.__bwreal)) - else: - self.fail("failed: could not complete iperf3 command") - else: - self.fail("failed: wlan0 interface doesn't have any ip address.") + def execute(self): + self.__resultlist = [] + stationList = {} + # check interfaces + if not self.checkInterface(self.__interface): + self.fail('Interface not found: {}'.format(self.__interface)) + # scan networks + scanner = iwScan(self.__interface) + if scanner.scan(): + stationList = scanner.getStations() + # check if we are connected + if not scanner.findConnected(): + self.fail('Not connected to test AP') + else: + strError = scanner.getLastError() + self.fail('qWifi scanner Execution error: {}'.format(strError)) + # Start Iperf + client = iperf3.Client() + client.duration = int(self.__duration) + client.server_hostname = self.__serverip + client.port = station2Port(self.__port) + count = int(self.__retriesCount) + while count > 0: + result = client.run() + if result.error == None: + if result.received_Mbps >= float(self.__bw) and result.sent_Mbps >= float(self.__bw): + save_json_to_disk(filePath='{}/{}'.format(self.__toPath, self.__wifi_st_file), + description='wifi scan', + mime='application/json', + json_data=stationList, + result=self.__resultlist) + save_json_to_disk(filePath='{}/{}'.format(self.__toPath, self.__wifi_res_file), + description='wifi {} iperf3'.format(self.__interface), + mime='application/json', + json_data=result.json, + result=self.__resultlist) + return else: - self.fail("failed: could not complete ifconfig command.") + self.fail('bw fail: rx:{} tx:{}'.format(result.received_Mbps, result.sent_Mbps)) else: - self.fail("failed: wifi module is not connected to the router.") - else: - self.fail("failed: could not execute iw command") + count = count - 1 + time.sleep(int(self.__waitRetryTime)) + + self.fail('qWifi (max tries) Execution error: {}'.format(result.error)) def getresults(self): return self.__resultlist diff --git a/test-cli/test_main.py b/test-cli/test_main.py index 942df61..3f52152 100644 --- a/test-cli/test_main.py +++ b/test-cli/test_main.py @@ -1,8 +1,17 @@ +import time +import sh +import os +import unittest +import socket +import logging +import sys +import getopt +import random + +from datetime import datetime from test.helpers.int_registers import get_die_id from test.helpers.int_registers import get_internal_mac from subprocess import call -import os -import unittest from test.helpers.testsrv_db import TestSrv_Database from test.helpers.setup_xml import XMLSetup from test.runners.simple import SimpleTestRunner @@ -11,7 +20,6 @@ from test.tests.qram import Qram from test.tests.qusb import Qusb from test.tests.qeeprom import Qeeprom from test.tests.qserial import Qserial -from test.tests.qusbdual import Qusbdual from test.tests.qwifi import Qwifi from test.tests.qrtc import Qrtc from test.tests.qduplex_ser import Qduplex @@ -19,25 +27,28 @@ from test.tests.qamper import Qamper from test.tests.qnand import Qnand from test.tests.qaudio import Qaudio from test.tests.qdmesg import Qdmesg +from test.tests.qmmcflash import Qmmcflash +from test.tests.qplc import Qplc from test.helpers.globalVariables import globalVar -import socket -from test.helpers.iseelogger import ISEE_Logger -import logging -from test.tasks.flasheeprom import flash_eeprom -from test.tasks.flashmemory import flash_memory +from test.tasks.flasheeprom import EEprom_Flasher +from test.tasks.flashmemory import NAND_Flasher from test.helpers.qrreader import QRReader -from test.helpers.cmdline import LinuxKernel +from test.helpers.cmdline import LinuxKernelCmd from test.helpers.amper import Amper from test.enums.StationStates import StationStates -import time -import sh +from test.helpers.mydebugger import sentdebugmessage +import xml.etree.ElementTree as XMLParser +from test.helpers.iseelogger import logObj +from test.helpers.utils import str2bool # global variables psdbObj = TestSrv_Database() xmlObj = None -loggerObj = None test_abspath = None tests_manually_executed = [] +AutoTest = False +OverrideAutoReboot = False +delay_reboot = '0' # define clear function @@ -45,74 +56,62 @@ def clear(): # check and make call for specific operating system _ = call('clear' if os.name == 'posix' else 'cls') +def reboot_board(): + logObj.getlogger().debug("#reboot board#") + if not OverrideAutoReboot: + set_next_state(StationStates.STATION_WAIT_SHUTDOWN.name) + if delay_reboot == '0': + time.sleep(5) + sh.shutdown('-r', '+{}'.format(delay_reboot), _bg=True) + # sh.reboot(_bg=True) + print("REBOOT...") + exit(0) -def execute_station_error(text): - print("Error: {}".format(text)) - loggerObj.getlogger().info("Station error: #{}#".format(text)) +def wait_for_reboot(): while True: currentstate = psdbObj.read_station_state(globalVar.station) if currentstate == StationStates.STATION_REBOOT.name: reboot_board() - time.sleep(1) + time.sleep(5) +def execute_station_error(text): + logObj.getlogger().error("Station error: #{}#".format(text)) + wait_for_reboot() def check_first_state(): currentstate = psdbObj.read_station_state(globalVar.station) - if currentstate == StationStates.STATION_ERROR.name: - while True: - currentstate = psdbObj.read_station_state(globalVar.station) - if currentstate == StationStates.STATION_REBOOT.name: - reboot_board() - time.sleep(1) - elif currentstate == StationStates.STATION_REBOOT.name: - reboot_board() + if currentstate == StationStates.STATION_ERROR.name or currentstate == StationStates.STATION_REBOOT.name: + logObj.getlogger().debug('station state during the first check is {}'.format(currentstate)) + wait_for_reboot() else: - starttime = time.time() + count_t = int(xmlObj.getKeyVal("test_main", "wait_test_start", "240")) while currentstate != StationStates.WAIT_TEST_START.name: - if time.time() - starttime >= 30.0: + logObj.getlogger().debug('wait for WAIT_TEST_START but read: {}'.format(currentstate)) + if count_t <= 0: # Error - execute_station_error("Timeout while waiting for WAIT_TEST_START state") + execute_station_error("Timeout while waiting for WAIT_TEST_START state actual STATE = {}".format(currentstate)) else: + count_t = count_t - 5 time.sleep(5) currentstate = psdbObj.read_station_state(globalVar.station) - if currentstate == StationStates.STATION_ERROR.name: - while True: - currentstate = psdbObj.read_station_state(globalVar.station) - if currentstate == StationStates.STATION_REBOOT.name: - reboot_board() - time.sleep(1) - elif currentstate == StationStates.STATION_REBOOT.name: - reboot_board() - - -def reboot_board(): - sh.reboot() - exit(0) + if currentstate == StationStates.STATION_ERROR.name or currentstate == StationStates.STATION_REBOOT.name: + wait_for_reboot() def set_next_state(newstate): statewritten = psdbObj.change_station_state(globalVar.station, newstate) - if statewritten == StationStates.STATION_REBOOT.name: - reboot_board() - elif statewritten == StationStates.STATION_ERROR.name: - while True: - currentstate = psdbObj.read_station_state(globalVar.station) - if currentstate == StationStates.STATION_REBOOT.name: - reboot_board() - time.sleep(1) - + if statewritten == StationStates.STATION_REBOOT.name or statewritten == StationStates.STATION_ERROR.name: + wait_for_reboot() def reboot_if_autotest(): # reset board if AUTOTEST is enabled - autotest = psdbObj.get_setup_variable("AUTOTEST_" + globalVar.station) - if int(autotest) == 1: + if AutoTest: reboot_board() def create_paramslist(params): paramlist = {} - for row in params: - varname, varvalue = row + for varname, varvalue in params: paramlist[varname] = varvalue return paramlist @@ -151,13 +150,17 @@ def add_test(suite, testdefname, paramlist): elif testdefname == "AUDIO": suite.addTest(Qaudio(testdefname, "execute", paramlist)) return 0 - elif testdefname == "USBLOOP": - suite.addTest(Qusbdual(testdefname, "execute", paramlist)) - return 0 elif testdefname == "DMESG": tests_manually_executed.append(Qdmesg(testdefname, "execute", paramlist)) return 0 + elif testdefname == "MMCFLASH": + suite.addTest(Qmmcflash(testdefname, "execute", paramlist)) + return 0 + elif testdefname == "PLC_MODEM": + suite.addTest(Qplc(testdefname, "execute", paramlist)) + return 0 else: + logObj.getlogger().error("Params for test: {} not implemented (ignore it), ".format(testdefname)) return 1 @@ -166,9 +169,9 @@ def create_testsuite(): suite1 = unittest.TestSuite() # get list of tests for this board tests = psdbObj.get_tests_list(globalVar.g_uuid) + logObj.getlogger().debug("Create TestSuite: {}".format(tests)) # loop in every test for this board - for row in tests: - testid, testdefname = row + for testid, testdefname in tests: # get params for this test params = psdbObj.get_test_params_list(testid) paramlist = create_paramslist(params) @@ -176,6 +179,8 @@ def create_testsuite(): paramlist["testid"] = testid paramlist["boarduuid"] = globalVar.g_uuid paramlist["testidctl"] = globalVar.testid_ctl + paramlist['testPath'] = test_abspath + logObj.getlogger().debug("Params for test: {}:{}-{}, ".format(testid, testdefname, paramlist)) paramlist["xml"] = xmlObj paramlist["db"] = psdbObj # add test to TestSuite @@ -185,44 +190,123 @@ def create_testsuite(): def create_board(): - cmd = LinuxKernel() + + cmd = LinuxKernelCmd() model_id = cmd.getkvar("bmodel", "none") variant = cmd.getkvar("bvariant", "none") + logObj.getlogger().debug("Kernel Command line vars: bmodel: {} bvariant: {}".format(model_id, variant)) if model_id == "none" or variant == "none": # Error - execute_station_error("Cannot get model and variant information") + execute_station_error("Cannot get model {} and variant {} information".format(model_id, variant)) # get model id globalVar.g_mid = model_id + "-" + variant + logObj.getlogger().debug("Model-Variant -> {}".format(globalVar.g_mid)) # get station if "Station" not in globalVar.station: # Error - execute_station_error("Wrong station name") + execute_station_error("Hostname not defined") processor_id = get_die_id(globalVar.g_mid) - print(globalVar.g_mid) - print(processor_id) globalVar.g_uuid = psdbObj.create_board(processor_id, model_id, variant, globalVar.station, get_internal_mac(globalVar.g_mid)) - print(globalVar.g_uuid) + + logObj.getlogger().info("processor ID: {} uuid: {}".format(processor_id, globalVar.g_uuid)) def get_taskvars_list(uuid): varlist = {} - for row in psdbObj.get_task_variables_list(uuid): - varname, varvalue = row + for varname, varvalue in psdbObj.get_task_variables_list(uuid): varlist[varname] = varvalue return varlist +def execute_extra_task(): + # list of task + myTask = [] + # Change state to "EXTRATASKS_RUNNING" + set_next_state(StationStates.EXTRATASKS_RUNNING.name) + # create task control + globalVar.taskid_ctl = psdbObj.open_task(globalVar.g_uuid) + # get extra variables + varlist = get_taskvars_list(globalVar.g_uuid) + + # Put default Vars + varlist["boarduuid"] = globalVar.g_uuid + varlist["testidctl"] = globalVar.testid_ctl + varlist['testPath'] = test_abspath + varlist["xml"] = xmlObj + varlist["db"] = psdbObj + + # Get list Extra Task and activation + eeprom = varlist.get('eeprom', '0') + flashNand = varlist.get('flashNAND', '0') + + if eeprom == '1': + myEEprom = EEprom_Flasher(varlist) + myTask.append(myEEprom) + if flashNand == '1': + myNAND = NAND_Flasher(varlist) + myTask.append(myNAND) + + taskresult = True + for task in myTask: + res, data = task.Execute() + if res: + psdbObj.create_task_result(globalVar.taskid_ctl, task.getTaskName(), "TASK_OK", data) + logObj.getlogger().info("task: {} ok".format(task.getTaskName())) + else: + psdbObj.create_task_result(globalVar.taskid_ctl, task.getTaskName(), "TASK_FAIL", data) + logObj.getlogger().info("task: {} failed".format(task.getTaskName())) + taskresult = False + + # check final taskresult => ok or fail + if taskresult: + psdbObj.update_taskctl_status(globalVar.taskid_ctl, "TASK_BOARD_OK") + else: + psdbObj.update_taskctl_status(globalVar.taskid_ctl, "TASK_BOARD_FAIL") + for task in myTask: + if task.getError() != '': + execute_station_error("Unable to complete extra tasks: {}".format(task.getError())) + # Not emit a error message + execute_station_error("Unable to complete extra tasks") + +def getBarCode(fake): + + set_next_state(StationStates.WAITING_FOR_SCANNER.name) + if fake: + psdbObj.set_factorycode(globalVar.g_uuid, str(random.randint(1, 1000001))) + return + qrreceived = False + wait_count = psdbObj.get_setup_variable("QRWAIT_SEC", xmlObj.getKeyVal("test_main", "qr_wait_time", "120")) + while not qrreceived or int(wait_count) > 0: + qr = QRReader() + if qr.openQR(): + # waits 5s to receive a valid code + if qr.readQRasync(10): + qrreceived = True + factorycode = qr.getQRNumber() + psdbObj.set_factorycode(globalVar.g_uuid, factorycode) + return + qr.closeQR() + else: + qrreceived = True + del qr + time.sleep(1) + wait_count = wait_count - 1 + if int(wait_count) == 0: + execute_station_error("Cannot find a barcode scanner") + -def main(): +def execute_test (): # initialize the board create_board() # create a process globalVar.testid_ctl = psdbObj.open_test(globalVar.g_uuid) + logObj.getlogger().info("Create new Test: {}".format(globalVar.testid_ctl)) + # Change state to "TESTS_RUNNING" set_next_state(StationStates.TESTS_RUNNING.name) @@ -230,71 +314,29 @@ def main(): suite1 = create_testsuite() # Execute tests (round 1) runner1 = SimpleTestRunner(psdbObj) + logObj.getlogger().info("Runner run id: {}".format(globalVar.testid_ctl)) testresult = runner1.run(suite1) + logObj.getlogger().info("Runner run id: {} finish {}".format(globalVar.testid_ctl, testresult)) # Execute manually tests for test in tests_manually_executed: - test.execute() + logObj.getlogger().info("Execute manual test id: {}:{}".format(globalVar.testid_ctl, test.getTestName())) + mtestResult = test.execute() + logObj.getlogger().info("Execute manual test id: {}:{} Result: {}".format(globalVar.testid_ctl, test.getTestName(), + 'ok' if mtestResult else 'fail')) # execute aditional tasks, only if the test was successful if testresult.wasSuccessful(): - # Change state to "TESTS_OK" + logObj.getlogger().info("Execute test id: {} finished succesfully".format(globalVar.testid_ctl)) set_next_state(StationStates.TESTS_OK.name) - # Change state to "EXTRATASKS_RUNNING" - set_next_state(StationStates.EXTRATASKS_RUNNING.name) - # create task control - globalVar.taskid_ctl = psdbObj.open_task(globalVar.g_uuid) - # get extra variables - varlist = get_taskvars_list(globalVar.g_uuid) - - alltasksok = False - - # flash eeprom - resulteeprom = 0 - if "eeprompath" in varlist and len(varlist["eeprompath"]) > 0: - mac0 = psdbObj.get_board_macaddr(globalVar.g_uuid) - resulteeprom, eepromdata = flash_eeprom(varlist["eeprompath"], globalVar.g_uuid, mac0) - psdbObj.create_task_result(globalVar.taskid_ctl, "FLASHEEPROM", - "TASK_OK" if resulteeprom == 0 else "TASK_FAIL", eepromdata) - - # flash non-volatile memory - resultmemory = 0 - if "flashimagepath" in varlist and len(varlist["flashimagepath"]) > 0: - resultmemory = flash_memory(varlist["flashimagepath"]) - psdbObj.create_task_result(globalVar.taskid_ctl, "FLASHMEMORY", - "TASK_OK" if resultmemory == 0 else "TASK_FAIL", - varlist["flashimagepath"]) - - # update status with the result - if resulteeprom == 0 and resultmemory == 0: - # finish tasks - psdbObj.update_taskctl_status(globalVar.taskid_ctl, "TASK_BOARD_OK") - - # Change state to "WAITING_FOR_SCANNER" - set_next_state(StationStates.WAITING_FOR_SCANNER.name) - - # get barcode using the scanner, only if autotest is disabled - autotest = psdbObj.get_setup_variable("AUTOTEST_" + globalVar.station) - if int(autotest) == 1: - psdbObj.set_factorycode(globalVar.g_uuid, "1234567890") # fake factory code - else: - qrreceived = False - while not qrreceived: - qr = QRReader() - if qr.openQR(): - # waits 5s to receive a valid code - if qr.readQRasync(5): - qrreceived = True - factorycode = qr.getQRNumber() - psdbObj.set_factorycode(globalVar.g_uuid, factorycode) - qr.closeQR() - - # Change state to "FINISHED" - set_next_state(StationStates.FINISHED.name) - else: - psdbObj.update_taskctl_status(globalVar.taskid_ctl, "TASK_BOARD_FAIL") - loggerObj.getlogger().info("Station error: #Unable to complete extra tasks#") + logObj.getlogger().info("Execute test id: {} extra task".format(globalVar.testid_ctl)) + execute_extra_task() + logObj.getlogger().info("Execute test id: {} wait read scanner".format(globalVar.testid_ctl)) + getBarCode(AutoTest) + set_next_state(StationStates.FINISHED.name) + logObj.getlogger().info("Execute test id: {} wait read scanner finish".format(globalVar.testid_ctl)) else: + logObj.getlogger().info("Execute test id: {} finished with one or more fails".format(globalVar.testid_ctl)) # Change state to "TESTS_FAILED" set_next_state(StationStates.TESTS_FAILED.name) @@ -302,28 +344,75 @@ def main(): reboot_if_autotest() +def enableFaulthandler(): + """ Enable faulthandler for all threads. + + If the faulthandler package is available, this function disables and then + re-enables fault handling for all threads (this is necessary to ensure any + new threads are handled correctly), and returns True. + + If faulthandler is not available, then returns False. + """ + try: + import faulthandler + # necessary to disable first or else new threads may not be handled. + faulthandler.disable() + faulthandler.enable(all_threads=True) + return True + except ImportError: + return False + if __name__ == "__main__": # Clear the shell screen clear() - test_abspath = os.path.dirname(os.path.abspath(__file__)) - # create logger - loggerObj = ISEE_Logger(logging.INFO) + enableFaulthandler() + + test_abspath = os.path.dirname(os.path.abspath(__file__)) + configFile = 'setup.xml' + loglevel = 'INFO' + logObj.getlogger().info("Test program Started : {}".format(datetime.now())) + try: + argv = sys.argv[1:] + opts, args = getopt.getopt(argv, 'c:l:', ['cfg', 'loglevel']) + for opt, args in opts: + if opt in ('-c', '--setup'): + configFile = args + if opt in ('-l', '--loglevel'): + loglevel = args + except getopt.GetoptError as err: + logObj.getlogger().error('#{}#'.format(err)) + + if loglevel != 'INFO': + logObj.setLogLevel(loglevel) # Try to parse the setup.xml file + logObj.getlogger().debug("parse config file: {}". format(configFile)) try: - xmlObj = XMLSetup(os.path.join(test_abspath, "setup.xml")) - except: + xmlObj = XMLSetup(os.path.join(test_abspath, configFile)) + except XMLParser.ParseError as error: # Error - execute_station_error("Cannot parse setup.xml file") + execute_station_error("Error parsing {} file {}".format(configFile, error.msg)) # Try to connect to the DB, according to setup.xml configuration if not psdbObj.open(xmlObj): # Error execute_station_error("Cannot open DB connection") + else: + logObj.getlogger().debug("database connection failed: {}".format(psdbObj.getConfig())) # get station name globalVar.station = socket.gethostname() + logObj.getlogger().info("Test Station: {}".format(globalVar.station)) + + if psdbObj.get_setup_variable("AUTOTEST_" + globalVar.station, xmlObj.getKeyVal("test_main", "autotest", "0")) == '1': + AutoTest = True + + OverrideAutoReboot = str2bool(xmlObj.getKeyVal("test_main", "OverrideAutoReboot", "0")) + delay_reboot = xmlObj.getKeyVal("test_main", "delayReboot", "0") + + logObj.getlogger().info("AutoTest: {} OverrideAutoReboot: {} delay Reboot: {}".format(AutoTest, OverrideAutoReboot, delay_reboot)) + # Check if current state is "WAIT_TEST_START". if not, waits here check_first_state() @@ -331,28 +420,7 @@ if __name__ == "__main__": # Change state to "TESTS_CHECKING_ENV" set_next_state(StationStates.TESTS_CHECKING_ENV.name) - # Wait before beginning - time.sleep(2) - - # Look for a barcode scanner - qr = QRReader() - if qr.openQR(): - qr.closeQR() - else: - # Error - execute_station_error("Cannot find a barcode scanner") - - # Look for an amperimeter - amp = Amper() - if not amp.open(): - # Error - execute_station_error("Cannot open an amperimeter COM port") - if not amp.hello(): - if not amp.hello(): - # Error - execute_station_error("Cannot open find an amperimeter") - - # Execute main - main() + # execute main program + execute_test() exit(0) |