summaryrefslogtreecommitdiff
path: root/test-cli/test/helpers
diff options
context:
space:
mode:
Diffstat (limited to 'test-cli/test/helpers')
-rw-r--r--test-cli/test/helpers/camara.py124
-rw-r--r--test-cli/test/helpers/cmdline.py2
-rw-r--r--test-cli/test/helpers/gpio.py55
-rw-r--r--test-cli/test/helpers/int_registers.py11
-rw-r--r--test-cli/test/helpers/iseelogger.py25
-rw-r--r--test-cli/test/helpers/iw.py124
-rw-r--r--test-cli/test/helpers/md5.py8
-rw-r--r--test-cli/test/helpers/plc.py57
-rw-r--r--test-cli/test/helpers/psqldb.py3
-rw-r--r--test-cli/test/helpers/qrreader.py3
-rw-r--r--test-cli/test/helpers/sdl.py126
-rw-r--r--test-cli/test/helpers/setup_xml.py1
-rw-r--r--test-cli/test/helpers/testsrv_db.py25
-rw-r--r--test-cli/test/helpers/utils.py68
14 files changed, 623 insertions, 9 deletions
diff --git a/test-cli/test/helpers/camara.py b/test-cli/test/helpers/camara.py
new file mode 100644
index 0000000..9b2829c
--- /dev/null
+++ b/test-cli/test/helpers/camara.py
@@ -0,0 +1,124 @@
+import cv2
+from test.helpers.syscmd import SysCommand
+
+class Camara(object):
+ __parent = None
+ __device_name = None
+ __device = None
+ __w = 1280
+ __h = 720
+ __contrast = 0.0
+ __brightness = 0.0
+ __saturation = 55.0
+ __hue = 0.0
+ __exposure = 166
+
+ def __init__(self, parent, device="video0", width=1280, height=720):
+ self.__parent = parent
+ self.__device_name = device
+ self.__w = width
+ self.__h = height
+
+ def Close(self):
+ if self.__device is not None:
+ del self.__device
+ self.__device = None
+
+ def Open(self):
+ self.Close()
+ self.__device = cv2.VideoCapture("/dev/{}".format(self.__device_name))
+ if self.__device.isOpened():
+ self.__configure()
+ return True
+ return False
+
+ def getSize(self):
+ return self.__w, self.__h
+
+ def setSize(self, w, h):
+ if self.__device is not None and self.__device.isOpened():
+ self.__w = self.__setCamVar(cv2.CAP_PROP_FRAME_WIDTH, w)
+ self.__h = self.__setCamVar(cv2.CAP_PROP_FRAME_HEIGHT, h)
+ else:
+ self.__w = w
+ self.__h = h
+
+ def setContrast(self, newVal):
+ if self.__device.isOpened():
+ self.__contrast = self.__setCamVar(cv2.CAP_PROP_CONTRAST, newVal)
+ else:
+ self.__contrast = newVal
+
+ def getContrast(self):
+ return self.__contrast
+
+ def setBrightness(self, newVal):
+ if self.__device.isOpened():
+ self.__brightness = self.__setCamVar(cv2.CAP_PROP_BRIGHTNESS, newVal)
+ else:
+ self.__brightness = newVal
+
+ def getBrightness(self):
+ return self.__brightness
+
+ def __configure(self):
+ self.__w = self.__setCamVar(cv2.CAP_PROP_FRAME_WIDTH, self.__w)
+ self.__h = self.__setCamVar(cv2.CAP_PROP_FRAME_HEIGHT, self.__h)
+ cam_setup = SysCommand('v4lsetup', '{}/scripts/v4l-cam.sh'.format(self.__parent.getAppPath()))
+ cam_setup.execute()
+
+ #self.__contrast = self.__setCamVar(cv2.CAP_PROP_CONTRAST, self.__contrast)
+ #self.__brightness = self.__setCamVar(cv2.CAP_PROP_BRIGHTNESS, self.__brightness)
+ #self.__saturation = self.__setCamVar(cv2.CAP_PROP_SATURATION, self.__saturation)
+ #self.__hue = self.__setCamVar(cv2.CAP_PROP_HUE, self.__hue)
+ #self.__exposure = self.__setCamVar(cv2.CAP_PROP_EXPOSURE, self.__exposure)
+
+
+ def __setCamVar(self, key, val):
+ valold = cv2.VideoCapture.get(self.__device, key)
+ if valold != val:
+ cv2.VideoCapture.set(self.__device, key, val)
+ t = cv2.VideoCapture.get(self.__device, key)
+ return t
+ return val
+
+ def __getCamVar(self, key):
+ return cv2.VideoCapture.get(self.__device, key);
+
+ def getFrameCount(self):
+ return cv2.VideoCapture.get(self.__device, cv2.CAP_PROP_BUFFERSIZE);
+
+ def getFrame(self):
+ if self.__device.isOpened():
+ retval, image = self.__device.read()
+ if retval:
+ return image
+ return None
+ else:
+ return None
+
+ def getImageSize(self, image):
+ if hasattr(image, 'shape'):
+ return image.shape[1], image.shape[0]
+ else:
+ return (0, 0, 0)
+
+ def showFrame(self, name, frame, w=False, maxTime=3000):
+ cv2.imshow(name, frame)
+ if w:
+ if maxTime == -1:
+ cv2.waitKey()
+ else:
+ cv2.waitKey(maxTime)
+
+ def saveFrame(self, fileName, Frame):
+ cv2.imwrite(fileName, Frame)
+
+ def readImage(self, filename):
+ return cv2.imread('{}'.format(filename), 0)
+
+ def destroyWindow(self, name):
+ cv2.destroyWindow(name)
+
+ def closeWindows(self):
+ cv2.destroyAllWindows() \ No newline at end of file
diff --git a/test-cli/test/helpers/cmdline.py b/test-cli/test/helpers/cmdline.py
index fad81ac..db94a1c 100644
--- a/test-cli/test/helpers/cmdline.py
+++ b/test-cli/test/helpers/cmdline.py
@@ -1,5 +1,5 @@
-class LinuxKernel:
+class LinuxKernelCmd:
__kernel_vars = {}
diff --git a/test-cli/test/helpers/gpio.py b/test-cli/test/helpers/gpio.py
new file mode 100644
index 0000000..f127f2b
--- /dev/null
+++ b/test-cli/test/helpers/gpio.py
@@ -0,0 +1,55 @@
+import os
+
+class gpio (object):
+ gpioNum = None
+
+ def __init__(self, gpioNum, dir, val):
+ self.gpioNum = gpioNum
+ self.__export(gpioNum)
+ self.__set_dir(gpioNum, dir)
+ if dir == 'out':
+ self.__set_value(gpioNum, val)
+
+ def set_val(self, value):
+ self.__set_value( self.gpioNum, value)
+
+
+ def __export(self, gpio_number):
+ if not os.path.isfile("/sys/class/gpio/gpio{}/value".format(gpio_number)):
+ try:
+ f = open("/sys/class/gpio/export", "w", newline="\n")
+ f.write(str(gpio_number))
+ f.close()
+ except IOError:
+ return False, '{}'.format(IOError.errno)
+ return True
+
+ def __unexport(self, gpio_number):
+ if os.path.isfile("/sys/class/gpio/gpio{}/value".format(gpio_number)):
+ try:
+ f = open("/sys/class/gpio/unexport", "w", newline="\n")
+ f.write(str(gpio_number))
+ f.close()
+ except IOError:
+ return False, '{}'.format(IOError.errno)
+ return True
+
+ def __set_dir(self, gpio_number, dir):
+ try:
+ f = open("/sys/class/gpio/gpio{}/direction".format(gpio_number), "r+", newline="\n")
+ val = f.readline()
+ if val != dir:
+ f.write(dir)
+ f.close()
+ except IOError:
+ return False, '{}'.format(IOError.errno)
+ return True
+
+ def __set_value(self, gpio_number, value):
+ try:
+ f = open("/sys/class/gpio/gpio{}/value".format(gpio_number), "w", newline="\n")
+ f.write(str(value))
+ f.close()
+ except IOError:
+ return False, '{}'.format(IOError.errno)
+ return True \ No newline at end of file
diff --git a/test-cli/test/helpers/int_registers.py b/test-cli/test/helpers/int_registers.py
index d081853..133387c 100644
--- a/test-cli/test/helpers/int_registers.py
+++ b/test-cli/test/helpers/int_registers.py
@@ -22,6 +22,12 @@ def read(addr):
os.close(fd)
return "%08X" % retval[0]
+def imx8m_readid():
+ f = open("/sys/bus/soc/devices/soc0/soc_uid", "r", newline="\n")
+ val = f.readline()
+ f.close()
+ return val.rstrip()
+
def get_die_id(modelid):
dieid = ""
@@ -36,9 +42,10 @@ def get_die_id(modelid):
registers = [0x4830A224, 0x4830A220, 0x4830A21C, 0x4830A218]
elif modelid.find("OMAP5") == 0:
registers = [0x4A002210, 0x4A00220C, 0x4A002208, 0x4A002200]
+ elif modelid.find("IGEP0048") == 0:
+ return imx8m_readid()
else:
- raise Exception('modelid not defined')
-
+ raise Exception('modelid not defined: {}, modelid')
for rg in registers:
dieid = dieid + read(rg)
return dieid
diff --git a/test-cli/test/helpers/iseelogger.py b/test-cli/test/helpers/iseelogger.py
index 785a78c..4b1087c 100644
--- a/test-cli/test/helpers/iseelogger.py
+++ b/test-cli/test/helpers/iseelogger.py
@@ -1,6 +1,6 @@
import logging
import logging.handlers
-
+import datetime
class ISEE_Logger(object):
__logger = None
@@ -37,3 +37,26 @@ class ISEE_Logger(object):
def getlogger(self):
return self.__logger
+class MeasureTime:
+ __difference = None
+ __first_time = None
+ __later_time = None
+
+ def __init__(self):
+ self.__first_time = datetime.datetime.now()
+
+ def start(self):
+ self.first_time = datetime.datetime.now()
+
+ def stop(self):
+ self.__later_time = datetime.datetime.now()
+ self.__difference = self.__later_time - self.__first_time
+ return self.__difference.total_seconds()
+
+ def getTime(self):
+ return self.__difference.total_seconds()
+
+
+global logObj
+logObj = ISEE_Logger(logging.INFO)
+
diff --git a/test-cli/test/helpers/iw.py b/test-cli/test/helpers/iw.py
new file mode 100644
index 0000000..8e29448
--- /dev/null
+++ b/test-cli/test/helpers/iw.py
@@ -0,0 +1,124 @@
+from sh import iw
+from scanf import scanf
+
+class iw_scan:
+ __interface = None
+ __raw_unparsed = []
+ __raw = None
+
+ def __init__(self, interface='wlan0'):
+ self.__interface = interface
+
+ def scan(self):
+ self.__raw_unparsed = []
+ self.__raw = iw('{}'.format(self.__interface), 'scan')
+ if self.__raw.exit_code == 0:
+ self.__raw_unparsed = self.__raw.stdout.decode("utf-8").split('\n')
+ return True
+ return False
+
+ def getRawData (self):
+ return self.__raw_unparsed
+
+ def getInterface(self):
+ return self.__interface
+
+ def getRawObject(self):
+ return self.__raw
+
+
+class iwScan:
+ __iw_scan = None
+ __station_list = {}
+
+ def __init__(self, interface='wlan0'):
+ self.__iw_scan = iw_scan(interface)
+
+ def scan(self):
+ if self.__iw_scan.scan():
+ self.__parse_scan(self.__iw_scan.getRawData())
+ return True
+ return False
+
+ def getLastError(self):
+ rObject = self.__iw_scan.getRawObject()
+ if rObject.exit_code != 0:
+ return rObject.stderr
+ return ''
+
+ def __Parse_BBS(self, line):
+ data = {}
+ res = scanf("BSS %s(on %s) -- %s", line)
+ if res == None:
+ res = scanf("BSS %s(on %s)", line)
+ data["MAC"] = res[0]
+ data["DEV"] = res[1]
+ if len(res) == 3:
+ data["ASSOCIATED"] = 'YES'
+ else:
+ data["ASSOCIATED"] = 'NO'
+ return data
+
+ def __Parse_T(self, id, OnNotFoundAttr ,line):
+ data = {}
+ res = scanf("{}: %s".format(id), line)
+ if res == None:
+ data[id.upper()] = OnNotFoundAttr
+ else:
+ data[id.upper()] = res[0]
+ return data
+
+ def __Parse_X(self, line):
+ data = {}
+ res = scanf("DS Parameter set: channel %s", line)
+ if res == None:
+ data['CHANNEL'] = '-1'
+ else:
+ data['CHANNEL'] = res[0]
+ return data
+
+ def __block_stationlist(self, rawdata):
+ list = {}
+ count = 0
+ for line in rawdata:
+ idx = line.find('BSS')
+ if idx != -1 and idx == 0:
+ if len(list) > 0:
+ count += 1
+ self.__station_list[count] = list
+ list = self.__Parse_BBS(line)
+ elif line.find('SSID:') != -1:
+ list = {**list, **self.__Parse_T('SSID', 'HIDDEN', line)}
+ elif line.find('freq:') != -1:
+ list = {**list, **self.__Parse_T('freq', '', line)}
+ elif line.find('signal:') != -1:
+ list = {**list, **self.__Parse_T('signal', '', line)}
+ elif line.find('DS Parameter set:') != -1:
+ list = {**list, **self.__Parse_X(line)}
+ if count > 0:
+ count += 1
+ self.__station_list[count] = list
+
+ def __parse_scan(self, rawData):
+ self.__station_list = {}
+ self.__block_stationlist(rawData)
+ #print('{}'.format(self.__station_list))
+ #for i in self.__station_list:
+ # print(self.__station_list[i])
+
+ def findConnected (self):
+ for i in self.__station_list:
+ st = self.__station_list[i]
+ if st.get('ASSOCIATED', 'NO') == 'YES':
+ return True, self.__station_list[i]
+ return False, None
+
+ def findBySSID(self, ssid):
+ for i in self.__station_list:
+ st = self.__station_list[i]
+ if st.get('SSID', '') == ssid:
+ return True, self.__station_list[i]
+ return False, None
+
+ def getStations(self):
+ return self.__station_list \ No newline at end of file
diff --git a/test-cli/test/helpers/md5.py b/test-cli/test/helpers/md5.py
new file mode 100644
index 0000000..9bc9e23
--- /dev/null
+++ b/test-cli/test/helpers/md5.py
@@ -0,0 +1,8 @@
+import hashlib
+
+def md5_file(fname):
+ hash_md5 = hashlib.md5()
+ with open(fname, "rb") as f:
+ for chunk in iter(lambda: f.read(4096), b""):
+ hash_md5.update(chunk)
+ return hash_md5.hexdigest() \ No newline at end of file
diff --git a/test-cli/test/helpers/plc.py b/test-cli/test/helpers/plc.py
new file mode 100644
index 0000000..3b00934
--- /dev/null
+++ b/test-cli/test/helpers/plc.py
@@ -0,0 +1,57 @@
+import sh
+from sh import flashcp
+from sh import flash_erase
+from sh import ErrorReturnCode
+from sh import Command
+from test.helpers.gpio import gpio
+
+
+class dcpPLC(object):
+ __nReset = None
+ __Phy = None
+ __Plc = None
+ __mtd_device = None
+ __factool = None
+ __myConfigTool = None
+
+ def __init__(self, plcfactool, mtd_device):
+ # default save firmware
+ self.__nRest = gpio('75', 'out', '0')
+ self.__nRest = gpio('69', 'out', '0')
+ self.__nRest = gpio('78', 'out', '1')
+ self.__factool = plcfactool
+ self.__mtd_device = mtd_device
+ self.__myConfigTool = Command(self.__factool)
+
+ def setSaveFirmwareMode(self):
+ self.__nRest = gpio('75', 'out', '0')
+ self.__nRest = gpio('69', 'out', '0')
+ self.__nRest = gpio('78', 'out', '1')
+
+ def setBootMode(self):
+ self.__nRest = gpio('78', 'out', '0')
+ self.__nRest = gpio('75', 'out', '1')
+ self.__nRest = gpio('69', 'out', '1')
+
+ def SaveFirmware(self, firmare):
+ self.setSaveFirmwareMode()
+ try:
+ flash_erase(self.__mtd_device)
+ flashcp(firmare, self.__mtd_device)
+ except ErrorReturnCode as Error:
+ return False, "plc flash firmware failed {} ".format(Error.exit_code)
+ return True, ''
+
+ def set_plc (self, var, value, password):
+ try:
+ self.__myConfigTool("-o", "SET", "-p", "{}".format(var), '{}'.format(value), "-w", "{}".format(password))
+ except ErrorReturnCode as Error:
+ return False, "set var failed {} {}".format(var,Error.exit_code)
+ return True, ''
+
+ def get_plc (self, var, value, password):
+ try:
+ self.__myConfigTool("-o", "GET", "-p", "{}".format(var), '{}'.format(value), "-w", "{}".format(password))
+ except ErrorReturnCode as Error:
+ return False, "set var failed {} {}".format(var,Error.exit_code)
+ return True, ''
diff --git a/test-cli/test/helpers/psqldb.py b/test-cli/test/helpers/psqldb.py
index c98338a..e703c3a 100644
--- a/test-cli/test/helpers/psqldb.py
+++ b/test-cli/test/helpers/psqldb.py
@@ -31,6 +31,9 @@ class PgSQLConnection(object):
print(error)
return result
+ def getConfig(self):
+ return self.__db_config
+
def db_execute_query(self, query):
cur = self.__conection_object.cursor()
cur.execute(query)
diff --git a/test-cli/test/helpers/qrreader.py b/test-cli/test/helpers/qrreader.py
index 744db54..f663e99 100644
--- a/test-cli/test/helpers/qrreader.py
+++ b/test-cli/test/helpers/qrreader.py
@@ -4,6 +4,7 @@ import threading
import time
import selectors
from selectors import DefaultSelector, EVENT_READ
+from test.helpers.iseelogger import logObj
selector = selectors.DefaultSelector()
@@ -37,7 +38,9 @@ class QRReader:
def getQRlist(self):
devices = [evdev.InputDevice(path) for path in evdev.list_devices()]
+ logObj.getlogger().debug("{}".format(devices))
for device in devices:
+ logObj.getlogger().debug("{}".format(device.name))
if device.name in qrdevice_list:
self.__myReader['NAME'] = device.name
# print(self.__myReader['NAME'])
diff --git a/test-cli/test/helpers/sdl.py b/test-cli/test/helpers/sdl.py
new file mode 100644
index 0000000..8f55d9f
--- /dev/null
+++ b/test-cli/test/helpers/sdl.py
@@ -0,0 +1,126 @@
+import sdl2.ext
+from sdl2 import *
+import os
+import time
+
+Colors = {
+ 'red': sdl2.ext.Color(255, 0, 0, 255),
+ 'green': sdl2.ext.Color(0, 255, 0, 255),
+ 'blue': sdl2.ext.Color(0, 0, 255, 255),
+ 'white': sdl2.ext.Color(255, 255, 255, 255),
+ 'black': sdl2.ext.Color(0, 0, 0, 255),
+}
+
+
+class SDL2(object):
+ __resources = None
+ __w = 1024
+ __h = 768
+ __winName = "noname"
+ __window = None
+ __factory = None
+ __renderer = None
+ __srender = None
+
+ def __init__(self, parent):
+ os.environ['SDL_VIDEODRIVER'] = "x11"
+ os.environ['DISPLAY'] = ":0"
+ self.__resources = sdl2.ext.Resources(parent.getAppPath(), 'files')
+ sdl2.ext.init()
+ self.__createWindow()
+
+ def __createWindow(self):
+ self.__window = sdl2.ext.Window(title='{}'.format(self.__winName), size=(self.__w, self.__h), position=(0, 0),
+ flags=(
+ SDL_WINDOW_HIDDEN | SDL_WINDOW_FULLSCREEN | SDL_WINDOW_BORDERLESS | SDL_WINDOW_MAXIMIZED))
+ self.__renderer = sdl2.ext.Renderer(self.__window)
+ self.__factory = sdl2.ext.SpriteFactory(sdl2.ext.TEXTURE, renderer=self.__renderer)
+ self.__srender = self.__factory.create_sprite_render_system(self.__window)
+ self.__window.show()
+ SDL_ShowCursor(SDL_DISABLE)
+
+ def createFont(self, fontName, fontSize, fontColor):
+ return sdl2.ext.FontManager(font_path=self.__resources.get_path(fontName), size=fontSize, color=fontColor)
+
+ def WriteText(self, text, x, y, fontManager):
+ imText = self.__factory.from_text(text, fontmanager=fontManager)
+ self.__renderer.copy(imText, dstrect=(x, y, imText.size[0], imText.size[1]))
+
+ def show(self):
+ self.__window.show()
+
+ def hide(self):
+ self.__window.hide()
+
+ def setColor(self, red, green, blue, alpha):
+ self.__renderer.color = sdl2.ext.Color(red, green, blue, alpha)
+
+ def fillColor(self, red, green, blue, alpha):
+ self.setColor(red, green, blue, alpha)
+ self.__renderer.clear()
+
+ def fillbgColor(self, color, update=False):
+ if color in Colors:
+ self.__renderer.color = Colors[color]
+ self.__renderer.clear()
+ if update:
+ self.update()
+
+ def update(self):
+ self.__renderer.present()
+
+ def showImage(self, filename):
+ im = self.__factory.from_image(self.__resources.get_path(filename))
+ self.__srender.render(im)
+
+
+class SDL2_Test(object):
+ __sdl2 = None
+
+ def __init__(self, parent):
+ self.__sdl2 = SDL2(parent)
+
+ def Clear(self):
+ self.__sdl2.fillbgColor('black', True)
+
+ def Paint(self, dcolor):
+ self.Clear()
+ self.__sdl2.fillbgColor(dcolor.lower(), True)
+
+
+class SDL2_Message(object):
+ __sdl2 = None
+ __fontManager_w = None
+ __fontManager_b = None
+
+ def __init__(self, parent):
+ self.__sdl2 = SDL2(parent)
+ self.__fontManager_w = self.__sdl2.createFont('OpenSans-Regular.ttf', 24, Colors['white'])
+ self.__fontManager_b = self.__sdl2.createFont('OpenSans-Regular.ttf', 24, Colors['black'])
+
+ def Clear(self):
+ self.__sdl2.fillbgColor('black', True)
+
+ def Paint(self, dcolor):
+ self.Clear()
+ self.__sdl2.fillbgColor(dcolor.lower(), True)
+
+ def __write_w(self, x, y, text):
+ self.__sdl2.WriteText(text, x, y, self.__fontManager_w)
+
+ def __write_b(self, x, y, text):
+ self.__sdl2.WriteText(text, x, y, self.__fontManager_b)
+
+ def setTestOK(self, board_uuid, qrdata):
+ self.Paint('GREEN')
+ self.__write_b(100, 50, 'TEST OK')
+ self.__write_b(100, 100, 'uuid={}'.format(board_uuid))
+ self.__write_b(100, 150, 'qr={}'.format(qrdata))
+ self.__sdl2.update()
+
+ def setTestERROR(self, error):
+ self.Paint('RED')
+ self.__write_w(100, 50, 'TEST FAILED')
+ self.__write_w(100, 100, '{}'.format(error))
+ self.__sdl2.update()
+
diff --git a/test-cli/test/helpers/setup_xml.py b/test-cli/test/helpers/setup_xml.py
index 603a2ac..d61d1fb 100644
--- a/test-cli/test/helpers/setup_xml.py
+++ b/test-cli/test/helpers/setup_xml.py
@@ -1,6 +1,5 @@
import xml.etree.ElementTree as XMLParser
-
class XMLSetup (object):
"""XML Setup Parser"""
__tree = None # Parser
diff --git a/test-cli/test/helpers/testsrv_db.py b/test-cli/test/helpers/testsrv_db.py
index 9579e7e..c2b2276 100644
--- a/test-cli/test/helpers/testsrv_db.py
+++ b/test-cli/test/helpers/testsrv_db.py
@@ -24,6 +24,9 @@ class TestSrv_Database(object):
self.__sqlObject = PgSQLConnection()
return self.__sqlObject.db_connect(self.__xml_setup.getdbConnectionStr())
+ def getConfig(self):
+ return self.__sqlObject.getConfig()
+
def create_board(self, processor_id, model_id, variant, station, bmac=None):
'''create a new board'''
if bmac is None:
@@ -41,7 +44,8 @@ class TestSrv_Database(object):
return res[0][0]
except Exception as err:
r = find_between(str(err), '#', '#')
- # print(r)
+ print(r)
+ print(str(err))
return None
def get_tests_list(self, board_uuid):
@@ -95,10 +99,11 @@ class TestSrv_Database(object):
sql = "SELECT isee.f_finish_test({},{},'{}','{}')".format(testid_ctl, testid, newstatus, textresult)
# print('>>>' + sql)
try:
+ print('finish_test => SQL {}'.format(sql))
self.__sqlObject.db_execute_query(sql)
except Exception as err:
r = find_between(str(err), '#', '#')
- # print(r)
+ print('finish_test => {}'.format(err))
return None
def upload_result_file(self, testid_ctl, testid, desc, filepath, mimetype):
@@ -195,6 +200,18 @@ class TestSrv_Database(object):
def change_station_state(self, station, newstate):
sql = "SELECT station.setmystate('{}', '{}', NULL)".format(newstate, station)
+ print('>>>' + sql)
+ try:
+ res = self.__sqlObject.db_execute_query(sql)
+ print(res)
+ return res[0][0]
+ except Exception as err:
+ r = find_between(str(err), '#', '#')
+ print(r)
+ return None
+
+ def get_setup_variable(self, skey):
+ sql = "SELECT * FROM admin.get_setupvar('{}')".format(skey)
# print('>>>' + sql)
try:
res = self.__sqlObject.db_execute_query(sql)
@@ -205,7 +222,7 @@ class TestSrv_Database(object):
# print(r)
return None
- def get_setup_variable(self, skey):
+ def get_setup_variable(self, skey , default):
sql = "SELECT * FROM admin.get_setupvar('{}')".format(skey)
# print('>>>' + sql)
try:
@@ -215,4 +232,4 @@ class TestSrv_Database(object):
except Exception as err:
r = find_between(str(err), '#', '#')
# print(r)
- return None
+ return default
diff --git a/test-cli/test/helpers/utils.py b/test-cli/test/helpers/utils.py
new file mode 100644
index 0000000..8d2c27b
--- /dev/null
+++ b/test-cli/test/helpers/utils.py
@@ -0,0 +1,68 @@
+import json
+import socket
+import os
+import scanf
+
+def save_json_to_disk(filePath, description, mime, json_data, result):
+ with open(filePath, 'w') as outfile:
+ json.dump(json_data, outfile, indent=4)
+ outfile.close()
+ result.append(
+ {
+ "description": description,
+ "filepath": filePath,
+ "mimetype": mime
+ }
+ )
+
+def save_file_to_disk(filePath, description, mime, data, result):
+ with open(filePath, 'w') as outfile:
+ outfile.write(data)
+ outfile.close()
+ result.append(
+ {
+ "description": description,
+ "filepath": filePath,
+ "mimetype": mime
+ }
+ )
+
+def save_result (filePath, description, mime, result):
+ result.append(
+ {
+ "description": description,
+ "filepath": filePath,
+ "mimetype": mime
+ }
+ )
+
+
+def str2bool(v):
+ return v.lower() in ("yes", "true", "t", "1")
+
+def station2Port(base):
+ Name = socket.gethostname()
+ res = scanf.scanf("Station%d", Name)
+ if res[0]:
+ NmBase = int(base)
+ res = int(res[0])
+ return str(NmBase + res)
+ return base
+
+
+def sys_read(sysnode):
+ try:
+ f = open(sysnode, "r")
+ data = f.readline()
+ f.close()
+ except IOError as Error:
+ return False, '{}'.format(Error.errno)
+ return True, data
+
+def removefileIfExist(fname):
+ if os.path.exists(fname):
+ try:
+ os.remove(fname)
+ except OSError as error:
+ return False, str(error)
+ return True, ''