diff options
author | Manel Caro <mcaro@iseebcn.com> | 2020-07-31 02:07:37 +0200 |
---|---|---|
committer | Manel Caro <mcaro@iseebcn.com> | 2020-07-31 02:07:37 +0200 |
commit | d46bce422fd03cd57d1ba336361da17d6efb48db (patch) | |
tree | e5ec7aa1ee5d53a655ce121a7c2ddd95888fc989 /test-cli/test/helpers | |
parent | 907b96801230e04d02575a3732a73e452089637b (diff) | |
download | board-d46bce422fd03cd57d1ba336361da17d6efb48db.zip board-d46bce422fd03cd57d1ba336361da17d6efb48db.tar.gz board-d46bce422fd03cd57d1ba336361da17d6efb48db.tar.bz2 |
TEST restructure
Diffstat (limited to 'test-cli/test/helpers')
-rw-r--r-- | test-cli/test/helpers/camara.py | 124 | ||||
-rw-r--r-- | test-cli/test/helpers/cmdline.py | 2 | ||||
-rw-r--r-- | test-cli/test/helpers/gpio.py | 55 | ||||
-rw-r--r-- | test-cli/test/helpers/int_registers.py | 11 | ||||
-rw-r--r-- | test-cli/test/helpers/iseelogger.py | 25 | ||||
-rw-r--r-- | test-cli/test/helpers/iw.py | 124 | ||||
-rw-r--r-- | test-cli/test/helpers/md5.py | 8 | ||||
-rw-r--r-- | test-cli/test/helpers/plc.py | 57 | ||||
-rw-r--r-- | test-cli/test/helpers/psqldb.py | 3 | ||||
-rw-r--r-- | test-cli/test/helpers/qrreader.py | 3 | ||||
-rw-r--r-- | test-cli/test/helpers/sdl.py | 126 | ||||
-rw-r--r-- | test-cli/test/helpers/setup_xml.py | 1 | ||||
-rw-r--r-- | test-cli/test/helpers/testsrv_db.py | 25 | ||||
-rw-r--r-- | test-cli/test/helpers/utils.py | 68 |
14 files changed, 623 insertions, 9 deletions
diff --git a/test-cli/test/helpers/camara.py b/test-cli/test/helpers/camara.py new file mode 100644 index 0000000..9b2829c --- /dev/null +++ b/test-cli/test/helpers/camara.py @@ -0,0 +1,124 @@ +import cv2 +from test.helpers.syscmd import SysCommand + +class Camara(object): + __parent = None + __device_name = None + __device = None + __w = 1280 + __h = 720 + __contrast = 0.0 + __brightness = 0.0 + __saturation = 55.0 + __hue = 0.0 + __exposure = 166 + + def __init__(self, parent, device="video0", width=1280, height=720): + self.__parent = parent + self.__device_name = device + self.__w = width + self.__h = height + + def Close(self): + if self.__device is not None: + del self.__device + self.__device = None + + def Open(self): + self.Close() + self.__device = cv2.VideoCapture("/dev/{}".format(self.__device_name)) + if self.__device.isOpened(): + self.__configure() + return True + return False + + def getSize(self): + return self.__w, self.__h + + def setSize(self, w, h): + if self.__device is not None and self.__device.isOpened(): + self.__w = self.__setCamVar(cv2.CAP_PROP_FRAME_WIDTH, w) + self.__h = self.__setCamVar(cv2.CAP_PROP_FRAME_HEIGHT, h) + else: + self.__w = w + self.__h = h + + def setContrast(self, newVal): + if self.__device.isOpened(): + self.__contrast = self.__setCamVar(cv2.CAP_PROP_CONTRAST, newVal) + else: + self.__contrast = newVal + + def getContrast(self): + return self.__contrast + + def setBrightness(self, newVal): + if self.__device.isOpened(): + self.__brightness = self.__setCamVar(cv2.CAP_PROP_BRIGHTNESS, newVal) + else: + self.__brightness = newVal + + def getBrightness(self): + return self.__brightness + + def __configure(self): + self.__w = self.__setCamVar(cv2.CAP_PROP_FRAME_WIDTH, self.__w) + self.__h = self.__setCamVar(cv2.CAP_PROP_FRAME_HEIGHT, self.__h) + cam_setup = SysCommand('v4lsetup', '{}/scripts/v4l-cam.sh'.format(self.__parent.getAppPath())) + cam_setup.execute() + + #self.__contrast = self.__setCamVar(cv2.CAP_PROP_CONTRAST, self.__contrast) + #self.__brightness = self.__setCamVar(cv2.CAP_PROP_BRIGHTNESS, self.__brightness) + #self.__saturation = self.__setCamVar(cv2.CAP_PROP_SATURATION, self.__saturation) + #self.__hue = self.__setCamVar(cv2.CAP_PROP_HUE, self.__hue) + #self.__exposure = self.__setCamVar(cv2.CAP_PROP_EXPOSURE, self.__exposure) + + + def __setCamVar(self, key, val): + valold = cv2.VideoCapture.get(self.__device, key) + if valold != val: + cv2.VideoCapture.set(self.__device, key, val) + t = cv2.VideoCapture.get(self.__device, key) + return t + return val + + def __getCamVar(self, key): + return cv2.VideoCapture.get(self.__device, key); + + def getFrameCount(self): + return cv2.VideoCapture.get(self.__device, cv2.CAP_PROP_BUFFERSIZE); + + def getFrame(self): + if self.__device.isOpened(): + retval, image = self.__device.read() + if retval: + return image + return None + else: + return None + + def getImageSize(self, image): + if hasattr(image, 'shape'): + return image.shape[1], image.shape[0] + else: + return (0, 0, 0) + + def showFrame(self, name, frame, w=False, maxTime=3000): + cv2.imshow(name, frame) + if w: + if maxTime == -1: + cv2.waitKey() + else: + cv2.waitKey(maxTime) + + def saveFrame(self, fileName, Frame): + cv2.imwrite(fileName, Frame) + + def readImage(self, filename): + return cv2.imread('{}'.format(filename), 0) + + def destroyWindow(self, name): + cv2.destroyWindow(name) + + def closeWindows(self): + cv2.destroyAllWindows()
\ No newline at end of file diff --git a/test-cli/test/helpers/cmdline.py b/test-cli/test/helpers/cmdline.py index fad81ac..db94a1c 100644 --- a/test-cli/test/helpers/cmdline.py +++ b/test-cli/test/helpers/cmdline.py @@ -1,5 +1,5 @@ -class LinuxKernel: +class LinuxKernelCmd: __kernel_vars = {} diff --git a/test-cli/test/helpers/gpio.py b/test-cli/test/helpers/gpio.py new file mode 100644 index 0000000..f127f2b --- /dev/null +++ b/test-cli/test/helpers/gpio.py @@ -0,0 +1,55 @@ +import os + +class gpio (object): + gpioNum = None + + def __init__(self, gpioNum, dir, val): + self.gpioNum = gpioNum + self.__export(gpioNum) + self.__set_dir(gpioNum, dir) + if dir == 'out': + self.__set_value(gpioNum, val) + + def set_val(self, value): + self.__set_value( self.gpioNum, value) + + + def __export(self, gpio_number): + if not os.path.isfile("/sys/class/gpio/gpio{}/value".format(gpio_number)): + try: + f = open("/sys/class/gpio/export", "w", newline="\n") + f.write(str(gpio_number)) + f.close() + except IOError: + return False, '{}'.format(IOError.errno) + return True + + def __unexport(self, gpio_number): + if os.path.isfile("/sys/class/gpio/gpio{}/value".format(gpio_number)): + try: + f = open("/sys/class/gpio/unexport", "w", newline="\n") + f.write(str(gpio_number)) + f.close() + except IOError: + return False, '{}'.format(IOError.errno) + return True + + def __set_dir(self, gpio_number, dir): + try: + f = open("/sys/class/gpio/gpio{}/direction".format(gpio_number), "r+", newline="\n") + val = f.readline() + if val != dir: + f.write(dir) + f.close() + except IOError: + return False, '{}'.format(IOError.errno) + return True + + def __set_value(self, gpio_number, value): + try: + f = open("/sys/class/gpio/gpio{}/value".format(gpio_number), "w", newline="\n") + f.write(str(value)) + f.close() + except IOError: + return False, '{}'.format(IOError.errno) + return True
\ No newline at end of file diff --git a/test-cli/test/helpers/int_registers.py b/test-cli/test/helpers/int_registers.py index d081853..133387c 100644 --- a/test-cli/test/helpers/int_registers.py +++ b/test-cli/test/helpers/int_registers.py @@ -22,6 +22,12 @@ def read(addr): os.close(fd) return "%08X" % retval[0] +def imx8m_readid(): + f = open("/sys/bus/soc/devices/soc0/soc_uid", "r", newline="\n") + val = f.readline() + f.close() + return val.rstrip() + def get_die_id(modelid): dieid = "" @@ -36,9 +42,10 @@ def get_die_id(modelid): registers = [0x4830A224, 0x4830A220, 0x4830A21C, 0x4830A218] elif modelid.find("OMAP5") == 0: registers = [0x4A002210, 0x4A00220C, 0x4A002208, 0x4A002200] + elif modelid.find("IGEP0048") == 0: + return imx8m_readid() else: - raise Exception('modelid not defined') - + raise Exception('modelid not defined: {}, modelid') for rg in registers: dieid = dieid + read(rg) return dieid diff --git a/test-cli/test/helpers/iseelogger.py b/test-cli/test/helpers/iseelogger.py index 785a78c..4b1087c 100644 --- a/test-cli/test/helpers/iseelogger.py +++ b/test-cli/test/helpers/iseelogger.py @@ -1,6 +1,6 @@ import logging import logging.handlers - +import datetime class ISEE_Logger(object): __logger = None @@ -37,3 +37,26 @@ class ISEE_Logger(object): def getlogger(self): return self.__logger +class MeasureTime: + __difference = None + __first_time = None + __later_time = None + + def __init__(self): + self.__first_time = datetime.datetime.now() + + def start(self): + self.first_time = datetime.datetime.now() + + def stop(self): + self.__later_time = datetime.datetime.now() + self.__difference = self.__later_time - self.__first_time + return self.__difference.total_seconds() + + def getTime(self): + return self.__difference.total_seconds() + + +global logObj +logObj = ISEE_Logger(logging.INFO) + diff --git a/test-cli/test/helpers/iw.py b/test-cli/test/helpers/iw.py new file mode 100644 index 0000000..8e29448 --- /dev/null +++ b/test-cli/test/helpers/iw.py @@ -0,0 +1,124 @@ +from sh import iw +from scanf import scanf + +class iw_scan: + __interface = None + __raw_unparsed = [] + __raw = None + + def __init__(self, interface='wlan0'): + self.__interface = interface + + def scan(self): + self.__raw_unparsed = [] + self.__raw = iw('{}'.format(self.__interface), 'scan') + if self.__raw.exit_code == 0: + self.__raw_unparsed = self.__raw.stdout.decode("utf-8").split('\n') + return True + return False + + def getRawData (self): + return self.__raw_unparsed + + def getInterface(self): + return self.__interface + + def getRawObject(self): + return self.__raw + + +class iwScan: + __iw_scan = None + __station_list = {} + + def __init__(self, interface='wlan0'): + self.__iw_scan = iw_scan(interface) + + def scan(self): + if self.__iw_scan.scan(): + self.__parse_scan(self.__iw_scan.getRawData()) + return True + return False + + def getLastError(self): + rObject = self.__iw_scan.getRawObject() + if rObject.exit_code != 0: + return rObject.stderr + return '' + + def __Parse_BBS(self, line): + data = {} + res = scanf("BSS %s(on %s) -- %s", line) + if res == None: + res = scanf("BSS %s(on %s)", line) + data["MAC"] = res[0] + data["DEV"] = res[1] + if len(res) == 3: + data["ASSOCIATED"] = 'YES' + else: + data["ASSOCIATED"] = 'NO' + return data + + def __Parse_T(self, id, OnNotFoundAttr ,line): + data = {} + res = scanf("{}: %s".format(id), line) + if res == None: + data[id.upper()] = OnNotFoundAttr + else: + data[id.upper()] = res[0] + return data + + def __Parse_X(self, line): + data = {} + res = scanf("DS Parameter set: channel %s", line) + if res == None: + data['CHANNEL'] = '-1' + else: + data['CHANNEL'] = res[0] + return data + + def __block_stationlist(self, rawdata): + list = {} + count = 0 + for line in rawdata: + idx = line.find('BSS') + if idx != -1 and idx == 0: + if len(list) > 0: + count += 1 + self.__station_list[count] = list + list = self.__Parse_BBS(line) + elif line.find('SSID:') != -1: + list = {**list, **self.__Parse_T('SSID', 'HIDDEN', line)} + elif line.find('freq:') != -1: + list = {**list, **self.__Parse_T('freq', '', line)} + elif line.find('signal:') != -1: + list = {**list, **self.__Parse_T('signal', '', line)} + elif line.find('DS Parameter set:') != -1: + list = {**list, **self.__Parse_X(line)} + if count > 0: + count += 1 + self.__station_list[count] = list + + def __parse_scan(self, rawData): + self.__station_list = {} + self.__block_stationlist(rawData) + #print('{}'.format(self.__station_list)) + #for i in self.__station_list: + # print(self.__station_list[i]) + + def findConnected (self): + for i in self.__station_list: + st = self.__station_list[i] + if st.get('ASSOCIATED', 'NO') == 'YES': + return True, self.__station_list[i] + return False, None + + def findBySSID(self, ssid): + for i in self.__station_list: + st = self.__station_list[i] + if st.get('SSID', '') == ssid: + return True, self.__station_list[i] + return False, None + + def getStations(self): + return self.__station_list
\ No newline at end of file diff --git a/test-cli/test/helpers/md5.py b/test-cli/test/helpers/md5.py new file mode 100644 index 0000000..9bc9e23 --- /dev/null +++ b/test-cli/test/helpers/md5.py @@ -0,0 +1,8 @@ +import hashlib + +def md5_file(fname): + hash_md5 = hashlib.md5() + with open(fname, "rb") as f: + for chunk in iter(lambda: f.read(4096), b""): + hash_md5.update(chunk) + return hash_md5.hexdigest()
\ No newline at end of file diff --git a/test-cli/test/helpers/plc.py b/test-cli/test/helpers/plc.py new file mode 100644 index 0000000..3b00934 --- /dev/null +++ b/test-cli/test/helpers/plc.py @@ -0,0 +1,57 @@ +import sh +from sh import flashcp +from sh import flash_erase +from sh import ErrorReturnCode +from sh import Command +from test.helpers.gpio import gpio + + +class dcpPLC(object): + __nReset = None + __Phy = None + __Plc = None + __mtd_device = None + __factool = None + __myConfigTool = None + + def __init__(self, plcfactool, mtd_device): + # default save firmware + self.__nRest = gpio('75', 'out', '0') + self.__nRest = gpio('69', 'out', '0') + self.__nRest = gpio('78', 'out', '1') + self.__factool = plcfactool + self.__mtd_device = mtd_device + self.__myConfigTool = Command(self.__factool) + + def setSaveFirmwareMode(self): + self.__nRest = gpio('75', 'out', '0') + self.__nRest = gpio('69', 'out', '0') + self.__nRest = gpio('78', 'out', '1') + + def setBootMode(self): + self.__nRest = gpio('78', 'out', '0') + self.__nRest = gpio('75', 'out', '1') + self.__nRest = gpio('69', 'out', '1') + + def SaveFirmware(self, firmare): + self.setSaveFirmwareMode() + try: + flash_erase(self.__mtd_device) + flashcp(firmare, self.__mtd_device) + except ErrorReturnCode as Error: + return False, "plc flash firmware failed {} ".format(Error.exit_code) + return True, '' + + def set_plc (self, var, value, password): + try: + self.__myConfigTool("-o", "SET", "-p", "{}".format(var), '{}'.format(value), "-w", "{}".format(password)) + except ErrorReturnCode as Error: + return False, "set var failed {} {}".format(var,Error.exit_code) + return True, '' + + def get_plc (self, var, value, password): + try: + self.__myConfigTool("-o", "GET", "-p", "{}".format(var), '{}'.format(value), "-w", "{}".format(password)) + except ErrorReturnCode as Error: + return False, "set var failed {} {}".format(var,Error.exit_code) + return True, '' diff --git a/test-cli/test/helpers/psqldb.py b/test-cli/test/helpers/psqldb.py index c98338a..e703c3a 100644 --- a/test-cli/test/helpers/psqldb.py +++ b/test-cli/test/helpers/psqldb.py @@ -31,6 +31,9 @@ class PgSQLConnection(object): print(error) return result + def getConfig(self): + return self.__db_config + def db_execute_query(self, query): cur = self.__conection_object.cursor() cur.execute(query) diff --git a/test-cli/test/helpers/qrreader.py b/test-cli/test/helpers/qrreader.py index 744db54..f663e99 100644 --- a/test-cli/test/helpers/qrreader.py +++ b/test-cli/test/helpers/qrreader.py @@ -4,6 +4,7 @@ import threading import time import selectors from selectors import DefaultSelector, EVENT_READ +from test.helpers.iseelogger import logObj selector = selectors.DefaultSelector() @@ -37,7 +38,9 @@ class QRReader: def getQRlist(self): devices = [evdev.InputDevice(path) for path in evdev.list_devices()] + logObj.getlogger().debug("{}".format(devices)) for device in devices: + logObj.getlogger().debug("{}".format(device.name)) if device.name in qrdevice_list: self.__myReader['NAME'] = device.name # print(self.__myReader['NAME']) diff --git a/test-cli/test/helpers/sdl.py b/test-cli/test/helpers/sdl.py new file mode 100644 index 0000000..8f55d9f --- /dev/null +++ b/test-cli/test/helpers/sdl.py @@ -0,0 +1,126 @@ +import sdl2.ext +from sdl2 import * +import os +import time + +Colors = { + 'red': sdl2.ext.Color(255, 0, 0, 255), + 'green': sdl2.ext.Color(0, 255, 0, 255), + 'blue': sdl2.ext.Color(0, 0, 255, 255), + 'white': sdl2.ext.Color(255, 255, 255, 255), + 'black': sdl2.ext.Color(0, 0, 0, 255), +} + + +class SDL2(object): + __resources = None + __w = 1024 + __h = 768 + __winName = "noname" + __window = None + __factory = None + __renderer = None + __srender = None + + def __init__(self, parent): + os.environ['SDL_VIDEODRIVER'] = "x11" + os.environ['DISPLAY'] = ":0" + self.__resources = sdl2.ext.Resources(parent.getAppPath(), 'files') + sdl2.ext.init() + self.__createWindow() + + def __createWindow(self): + self.__window = sdl2.ext.Window(title='{}'.format(self.__winName), size=(self.__w, self.__h), position=(0, 0), + flags=( + SDL_WINDOW_HIDDEN | SDL_WINDOW_FULLSCREEN | SDL_WINDOW_BORDERLESS | SDL_WINDOW_MAXIMIZED)) + self.__renderer = sdl2.ext.Renderer(self.__window) + self.__factory = sdl2.ext.SpriteFactory(sdl2.ext.TEXTURE, renderer=self.__renderer) + self.__srender = self.__factory.create_sprite_render_system(self.__window) + self.__window.show() + SDL_ShowCursor(SDL_DISABLE) + + def createFont(self, fontName, fontSize, fontColor): + return sdl2.ext.FontManager(font_path=self.__resources.get_path(fontName), size=fontSize, color=fontColor) + + def WriteText(self, text, x, y, fontManager): + imText = self.__factory.from_text(text, fontmanager=fontManager) + self.__renderer.copy(imText, dstrect=(x, y, imText.size[0], imText.size[1])) + + def show(self): + self.__window.show() + + def hide(self): + self.__window.hide() + + def setColor(self, red, green, blue, alpha): + self.__renderer.color = sdl2.ext.Color(red, green, blue, alpha) + + def fillColor(self, red, green, blue, alpha): + self.setColor(red, green, blue, alpha) + self.__renderer.clear() + + def fillbgColor(self, color, update=False): + if color in Colors: + self.__renderer.color = Colors[color] + self.__renderer.clear() + if update: + self.update() + + def update(self): + self.__renderer.present() + + def showImage(self, filename): + im = self.__factory.from_image(self.__resources.get_path(filename)) + self.__srender.render(im) + + +class SDL2_Test(object): + __sdl2 = None + + def __init__(self, parent): + self.__sdl2 = SDL2(parent) + + def Clear(self): + self.__sdl2.fillbgColor('black', True) + + def Paint(self, dcolor): + self.Clear() + self.__sdl2.fillbgColor(dcolor.lower(), True) + + +class SDL2_Message(object): + __sdl2 = None + __fontManager_w = None + __fontManager_b = None + + def __init__(self, parent): + self.__sdl2 = SDL2(parent) + self.__fontManager_w = self.__sdl2.createFont('OpenSans-Regular.ttf', 24, Colors['white']) + self.__fontManager_b = self.__sdl2.createFont('OpenSans-Regular.ttf', 24, Colors['black']) + + def Clear(self): + self.__sdl2.fillbgColor('black', True) + + def Paint(self, dcolor): + self.Clear() + self.__sdl2.fillbgColor(dcolor.lower(), True) + + def __write_w(self, x, y, text): + self.__sdl2.WriteText(text, x, y, self.__fontManager_w) + + def __write_b(self, x, y, text): + self.__sdl2.WriteText(text, x, y, self.__fontManager_b) + + def setTestOK(self, board_uuid, qrdata): + self.Paint('GREEN') + self.__write_b(100, 50, 'TEST OK') + self.__write_b(100, 100, 'uuid={}'.format(board_uuid)) + self.__write_b(100, 150, 'qr={}'.format(qrdata)) + self.__sdl2.update() + + def setTestERROR(self, error): + self.Paint('RED') + self.__write_w(100, 50, 'TEST FAILED') + self.__write_w(100, 100, '{}'.format(error)) + self.__sdl2.update() + diff --git a/test-cli/test/helpers/setup_xml.py b/test-cli/test/helpers/setup_xml.py index 603a2ac..d61d1fb 100644 --- a/test-cli/test/helpers/setup_xml.py +++ b/test-cli/test/helpers/setup_xml.py @@ -1,6 +1,5 @@ import xml.etree.ElementTree as XMLParser - class XMLSetup (object): """XML Setup Parser""" __tree = None # Parser diff --git a/test-cli/test/helpers/testsrv_db.py b/test-cli/test/helpers/testsrv_db.py index 9579e7e..c2b2276 100644 --- a/test-cli/test/helpers/testsrv_db.py +++ b/test-cli/test/helpers/testsrv_db.py @@ -24,6 +24,9 @@ class TestSrv_Database(object): self.__sqlObject = PgSQLConnection() return self.__sqlObject.db_connect(self.__xml_setup.getdbConnectionStr()) + def getConfig(self): + return self.__sqlObject.getConfig() + def create_board(self, processor_id, model_id, variant, station, bmac=None): '''create a new board''' if bmac is None: @@ -41,7 +44,8 @@ class TestSrv_Database(object): return res[0][0] except Exception as err: r = find_between(str(err), '#', '#') - # print(r) + print(r) + print(str(err)) return None def get_tests_list(self, board_uuid): @@ -95,10 +99,11 @@ class TestSrv_Database(object): sql = "SELECT isee.f_finish_test({},{},'{}','{}')".format(testid_ctl, testid, newstatus, textresult) # print('>>>' + sql) try: + print('finish_test => SQL {}'.format(sql)) self.__sqlObject.db_execute_query(sql) except Exception as err: r = find_between(str(err), '#', '#') - # print(r) + print('finish_test => {}'.format(err)) return None def upload_result_file(self, testid_ctl, testid, desc, filepath, mimetype): @@ -195,6 +200,18 @@ class TestSrv_Database(object): def change_station_state(self, station, newstate): sql = "SELECT station.setmystate('{}', '{}', NULL)".format(newstate, station) + print('>>>' + sql) + try: + res = self.__sqlObject.db_execute_query(sql) + print(res) + return res[0][0] + except Exception as err: + r = find_between(str(err), '#', '#') + print(r) + return None + + def get_setup_variable(self, skey): + sql = "SELECT * FROM admin.get_setupvar('{}')".format(skey) # print('>>>' + sql) try: res = self.__sqlObject.db_execute_query(sql) @@ -205,7 +222,7 @@ class TestSrv_Database(object): # print(r) return None - def get_setup_variable(self, skey): + def get_setup_variable(self, skey , default): sql = "SELECT * FROM admin.get_setupvar('{}')".format(skey) # print('>>>' + sql) try: @@ -215,4 +232,4 @@ class TestSrv_Database(object): except Exception as err: r = find_between(str(err), '#', '#') # print(r) - return None + return default diff --git a/test-cli/test/helpers/utils.py b/test-cli/test/helpers/utils.py new file mode 100644 index 0000000..8d2c27b --- /dev/null +++ b/test-cli/test/helpers/utils.py @@ -0,0 +1,68 @@ +import json +import socket +import os +import scanf + +def save_json_to_disk(filePath, description, mime, json_data, result): + with open(filePath, 'w') as outfile: + json.dump(json_data, outfile, indent=4) + outfile.close() + result.append( + { + "description": description, + "filepath": filePath, + "mimetype": mime + } + ) + +def save_file_to_disk(filePath, description, mime, data, result): + with open(filePath, 'w') as outfile: + outfile.write(data) + outfile.close() + result.append( + { + "description": description, + "filepath": filePath, + "mimetype": mime + } + ) + +def save_result (filePath, description, mime, result): + result.append( + { + "description": description, + "filepath": filePath, + "mimetype": mime + } + ) + + +def str2bool(v): + return v.lower() in ("yes", "true", "t", "1") + +def station2Port(base): + Name = socket.gethostname() + res = scanf.scanf("Station%d", Name) + if res[0]: + NmBase = int(base) + res = int(res[0]) + return str(NmBase + res) + return base + + +def sys_read(sysnode): + try: + f = open(sysnode, "r") + data = f.readline() + f.close() + except IOError as Error: + return False, '{}'.format(Error.errno) + return True, data + +def removefileIfExist(fname): + if os.path.exists(fname): + try: + os.remove(fname) + except OSError as error: + return False, str(error) + return True, '' |