summaryrefslogtreecommitdiff
path: root/test-cli
diff options
context:
space:
mode:
authorManel Caro <mcaro@iseebcn.com>2020-07-31 02:07:37 +0200
committerManel Caro <mcaro@iseebcn.com>2020-07-31 02:07:37 +0200
commitd46bce422fd03cd57d1ba336361da17d6efb48db (patch)
treee5ec7aa1ee5d53a655ce121a7c2ddd95888fc989 /test-cli
parent907b96801230e04d02575a3732a73e452089637b (diff)
downloadboard-d46bce422fd03cd57d1ba336361da17d6efb48db.zip
board-d46bce422fd03cd57d1ba336361da17d6efb48db.tar.gz
board-d46bce422fd03cd57d1ba336361da17d6efb48db.tar.bz2
TEST restructure
Diffstat (limited to 'test-cli')
-rw-r--r--test-cli/__init__.py0
-rw-r--r--test-cli/setup.xml14
-rw-r--r--test-cli/test/enums/StationStates.py1
-rw-r--r--test-cli/test/helpers/camara.py124
-rw-r--r--test-cli/test/helpers/cmdline.py2
-rw-r--r--test-cli/test/helpers/gpio.py55
-rw-r--r--test-cli/test/helpers/int_registers.py11
-rw-r--r--test-cli/test/helpers/iseelogger.py25
-rw-r--r--test-cli/test/helpers/iw.py124
-rw-r--r--test-cli/test/helpers/md5.py8
-rw-r--r--test-cli/test/helpers/plc.py57
-rw-r--r--test-cli/test/helpers/psqldb.py3
-rw-r--r--test-cli/test/helpers/qrreader.py3
-rw-r--r--test-cli/test/helpers/sdl.py126
-rw-r--r--test-cli/test/helpers/setup_xml.py1
-rw-r--r--test-cli/test/helpers/testsrv_db.py25
-rw-r--r--test-cli/test/helpers/utils.py68
-rw-r--r--test-cli/test/runners/simple.py41
-rw-r--r--test-cli/test/tasks/flasheeprom.py121
-rw-r--r--test-cli/test/tasks/flashmemory.py50
-rw-r--r--test-cli/test/tests/qamper.py32
-rw-r--r--test-cli/test/tests/qaudio.py77
-rw-r--r--test-cli/test/tests/qdmesg.py30
-rw-r--r--test-cli/test/tests/qeeprom.py77
-rw-r--r--test-cli/test/tests/qethernet.py77
-rw-r--r--test-cli/test/tests/qmmcflash.py46
-rw-r--r--test-cli/test/tests/qnand.py44
-rw-r--r--test-cli/test/tests/qplc.py70
-rw-r--r--test-cli/test/tests/qram.py56
-rw-r--r--test-cli/test/tests/qusb.py143
-rw-r--r--test-cli/test/tests/qusbdual.py90
-rw-r--r--test-cli/test/tests/qwifi.py143
-rw-r--r--test-cli/test_main.py372
33 files changed, 1512 insertions, 604 deletions
diff --git a/test-cli/__init__.py b/test-cli/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/test-cli/__init__.py
diff --git a/test-cli/setup.xml b/test-cli/setup.xml
index 5220887..5cdd65b 100644
--- a/test-cli/setup.xml
+++ b/test-cli/setup.xml
@@ -1,7 +1,21 @@
<?xml version="1.0"?>
<data>
<setup>
+ <test_main qr_wait_time="30" autotest="0" OverrideAutoReboot="1" delayReboot="1" wait_test_start="240"/>
<db dbname="testsrv" type="PgSQLConnection" host="192.168.60.3" port="5432" user="admin" password="Idkfa2009" />
+ <qaudio play_dtmf_file="/root/hwtest-files/dtmf-13579.wav" to="/mnt/station_ramdisk" record_dtmf_file="dtmf_recorded_{}.wav" recordtime="2.00" rate="8000" aplay_run_delay="0.1"/>
+ <qusb loops="3" pathfrom="/root/hwtest-files" file="usb-test.bin" file_md5="usb-test.bin.md5" pathto="/mnt/test/disk2"/>
+ <qusb2 loops="3" pathfrom="/mnt/test/disk1" file="usb-test.bin" file_md5="usb-test.bin.md5" pathto="/nfs/station/tmp"/>
+ <qdmesg syslog_dmesg="/var/log/kern.log"/>
+ <qram loops="1" memsize="100M" memtesterPath="/root/hwtest-files/memtester" to="/mnt/station_ramdisk" ram_res_file="ram_res.txt"/>
+ <qnand nandtestPath="/root/hwtest-files/nandtest"/>
+ <NAND_Flasher name="NAND_Flasher" igep_flash="/usr/bin/igep-flash" skipnandtest="1"/>
+ <EEProm_Flasher name="EEPROM_Flasher"/>
+ <qwifi loops="1" interface="wlan0" bwexpected="5.0" port="5000" serverip="192.168.5.4" duration="10" to="/mnt/station_ramdisk" wifi_res_file="eth_test_{}.json" wifi_st_file="eth_st_{}.json" retries="5" wait_retry="10"/>
+ <qeth loops="1" interface="eth0" bwexpected="5.0" port="5000" serverip="192.168.60.3" duration="10" to="/mnt/station_ramdisk" eth_res_file="eth_test_{}.json" eth_st_file="eth_st_{}.json" retries="5" wait_retry="10"/>
+ <qeeprom i2c_address="0050" i2c_bus="0"/>
+ <qemmc device="mmcblk0" to="/mnt/station_ramdisk" emmc_res_file="emmc_status.txt"/>
+ <qplc firmware="" firmware_md5="" factory_tool="/root/hwtest-files/apps/configlayer" gen_ip="0" gen_mac="1" mtd_device="/dev/mtd0" factory_password="betera" firmware_password="paterna"/>
</setup>
<usb>
<usbdev MANUFACTURER="Linux 4.9.81+ ohci_hcd" PRODUCT="OHCI Host Controller" CLASS="09" VENDOR="1d6b" PRODID="0001" ignore="1"/>
diff --git a/test-cli/test/enums/StationStates.py b/test-cli/test/enums/StationStates.py
index 040e6bc..88dba1f 100644
--- a/test-cli/test/enums/StationStates.py
+++ b/test-cli/test/enums/StationStates.py
@@ -18,4 +18,5 @@ class StationStates(Enum):
WAITING_FOR_SCANNER = 50
FINISHED = 60
STATION_REBOOT = 2001
+ STATION_WAIT_SHUTDOWN = 2002
diff --git a/test-cli/test/helpers/camara.py b/test-cli/test/helpers/camara.py
new file mode 100644
index 0000000..9b2829c
--- /dev/null
+++ b/test-cli/test/helpers/camara.py
@@ -0,0 +1,124 @@
+import cv2
+from test.helpers.syscmd import SysCommand
+
+class Camara(object):
+ __parent = None
+ __device_name = None
+ __device = None
+ __w = 1280
+ __h = 720
+ __contrast = 0.0
+ __brightness = 0.0
+ __saturation = 55.0
+ __hue = 0.0
+ __exposure = 166
+
+ def __init__(self, parent, device="video0", width=1280, height=720):
+ self.__parent = parent
+ self.__device_name = device
+ self.__w = width
+ self.__h = height
+
+ def Close(self):
+ if self.__device is not None:
+ del self.__device
+ self.__device = None
+
+ def Open(self):
+ self.Close()
+ self.__device = cv2.VideoCapture("/dev/{}".format(self.__device_name))
+ if self.__device.isOpened():
+ self.__configure()
+ return True
+ return False
+
+ def getSize(self):
+ return self.__w, self.__h
+
+ def setSize(self, w, h):
+ if self.__device is not None and self.__device.isOpened():
+ self.__w = self.__setCamVar(cv2.CAP_PROP_FRAME_WIDTH, w)
+ self.__h = self.__setCamVar(cv2.CAP_PROP_FRAME_HEIGHT, h)
+ else:
+ self.__w = w
+ self.__h = h
+
+ def setContrast(self, newVal):
+ if self.__device.isOpened():
+ self.__contrast = self.__setCamVar(cv2.CAP_PROP_CONTRAST, newVal)
+ else:
+ self.__contrast = newVal
+
+ def getContrast(self):
+ return self.__contrast
+
+ def setBrightness(self, newVal):
+ if self.__device.isOpened():
+ self.__brightness = self.__setCamVar(cv2.CAP_PROP_BRIGHTNESS, newVal)
+ else:
+ self.__brightness = newVal
+
+ def getBrightness(self):
+ return self.__brightness
+
+ def __configure(self):
+ self.__w = self.__setCamVar(cv2.CAP_PROP_FRAME_WIDTH, self.__w)
+ self.__h = self.__setCamVar(cv2.CAP_PROP_FRAME_HEIGHT, self.__h)
+ cam_setup = SysCommand('v4lsetup', '{}/scripts/v4l-cam.sh'.format(self.__parent.getAppPath()))
+ cam_setup.execute()
+
+ #self.__contrast = self.__setCamVar(cv2.CAP_PROP_CONTRAST, self.__contrast)
+ #self.__brightness = self.__setCamVar(cv2.CAP_PROP_BRIGHTNESS, self.__brightness)
+ #self.__saturation = self.__setCamVar(cv2.CAP_PROP_SATURATION, self.__saturation)
+ #self.__hue = self.__setCamVar(cv2.CAP_PROP_HUE, self.__hue)
+ #self.__exposure = self.__setCamVar(cv2.CAP_PROP_EXPOSURE, self.__exposure)
+
+
+ def __setCamVar(self, key, val):
+ valold = cv2.VideoCapture.get(self.__device, key)
+ if valold != val:
+ cv2.VideoCapture.set(self.__device, key, val)
+ t = cv2.VideoCapture.get(self.__device, key)
+ return t
+ return val
+
+ def __getCamVar(self, key):
+ return cv2.VideoCapture.get(self.__device, key);
+
+ def getFrameCount(self):
+ return cv2.VideoCapture.get(self.__device, cv2.CAP_PROP_BUFFERSIZE);
+
+ def getFrame(self):
+ if self.__device.isOpened():
+ retval, image = self.__device.read()
+ if retval:
+ return image
+ return None
+ else:
+ return None
+
+ def getImageSize(self, image):
+ if hasattr(image, 'shape'):
+ return image.shape[1], image.shape[0]
+ else:
+ return (0, 0, 0)
+
+ def showFrame(self, name, frame, w=False, maxTime=3000):
+ cv2.imshow(name, frame)
+ if w:
+ if maxTime == -1:
+ cv2.waitKey()
+ else:
+ cv2.waitKey(maxTime)
+
+ def saveFrame(self, fileName, Frame):
+ cv2.imwrite(fileName, Frame)
+
+ def readImage(self, filename):
+ return cv2.imread('{}'.format(filename), 0)
+
+ def destroyWindow(self, name):
+ cv2.destroyWindow(name)
+
+ def closeWindows(self):
+ cv2.destroyAllWindows() \ No newline at end of file
diff --git a/test-cli/test/helpers/cmdline.py b/test-cli/test/helpers/cmdline.py
index fad81ac..db94a1c 100644
--- a/test-cli/test/helpers/cmdline.py
+++ b/test-cli/test/helpers/cmdline.py
@@ -1,5 +1,5 @@
-class LinuxKernel:
+class LinuxKernelCmd:
__kernel_vars = {}
diff --git a/test-cli/test/helpers/gpio.py b/test-cli/test/helpers/gpio.py
new file mode 100644
index 0000000..f127f2b
--- /dev/null
+++ b/test-cli/test/helpers/gpio.py
@@ -0,0 +1,55 @@
+import os
+
+class gpio (object):
+ gpioNum = None
+
+ def __init__(self, gpioNum, dir, val):
+ self.gpioNum = gpioNum
+ self.__export(gpioNum)
+ self.__set_dir(gpioNum, dir)
+ if dir == 'out':
+ self.__set_value(gpioNum, val)
+
+ def set_val(self, value):
+ self.__set_value( self.gpioNum, value)
+
+
+ def __export(self, gpio_number):
+ if not os.path.isfile("/sys/class/gpio/gpio{}/value".format(gpio_number)):
+ try:
+ f = open("/sys/class/gpio/export", "w", newline="\n")
+ f.write(str(gpio_number))
+ f.close()
+ except IOError:
+ return False, '{}'.format(IOError.errno)
+ return True
+
+ def __unexport(self, gpio_number):
+ if os.path.isfile("/sys/class/gpio/gpio{}/value".format(gpio_number)):
+ try:
+ f = open("/sys/class/gpio/unexport", "w", newline="\n")
+ f.write(str(gpio_number))
+ f.close()
+ except IOError:
+ return False, '{}'.format(IOError.errno)
+ return True
+
+ def __set_dir(self, gpio_number, dir):
+ try:
+ f = open("/sys/class/gpio/gpio{}/direction".format(gpio_number), "r+", newline="\n")
+ val = f.readline()
+ if val != dir:
+ f.write(dir)
+ f.close()
+ except IOError:
+ return False, '{}'.format(IOError.errno)
+ return True
+
+ def __set_value(self, gpio_number, value):
+ try:
+ f = open("/sys/class/gpio/gpio{}/value".format(gpio_number), "w", newline="\n")
+ f.write(str(value))
+ f.close()
+ except IOError:
+ return False, '{}'.format(IOError.errno)
+ return True \ No newline at end of file
diff --git a/test-cli/test/helpers/int_registers.py b/test-cli/test/helpers/int_registers.py
index d081853..133387c 100644
--- a/test-cli/test/helpers/int_registers.py
+++ b/test-cli/test/helpers/int_registers.py
@@ -22,6 +22,12 @@ def read(addr):
os.close(fd)
return "%08X" % retval[0]
+def imx8m_readid():
+ f = open("/sys/bus/soc/devices/soc0/soc_uid", "r", newline="\n")
+ val = f.readline()
+ f.close()
+ return val.rstrip()
+
def get_die_id(modelid):
dieid = ""
@@ -36,9 +42,10 @@ def get_die_id(modelid):
registers = [0x4830A224, 0x4830A220, 0x4830A21C, 0x4830A218]
elif modelid.find("OMAP5") == 0:
registers = [0x4A002210, 0x4A00220C, 0x4A002208, 0x4A002200]
+ elif modelid.find("IGEP0048") == 0:
+ return imx8m_readid()
else:
- raise Exception('modelid not defined')
-
+ raise Exception('modelid not defined: {}, modelid')
for rg in registers:
dieid = dieid + read(rg)
return dieid
diff --git a/test-cli/test/helpers/iseelogger.py b/test-cli/test/helpers/iseelogger.py
index 785a78c..4b1087c 100644
--- a/test-cli/test/helpers/iseelogger.py
+++ b/test-cli/test/helpers/iseelogger.py
@@ -1,6 +1,6 @@
import logging
import logging.handlers
-
+import datetime
class ISEE_Logger(object):
__logger = None
@@ -37,3 +37,26 @@ class ISEE_Logger(object):
def getlogger(self):
return self.__logger
+class MeasureTime:
+ __difference = None
+ __first_time = None
+ __later_time = None
+
+ def __init__(self):
+ self.__first_time = datetime.datetime.now()
+
+ def start(self):
+ self.first_time = datetime.datetime.now()
+
+ def stop(self):
+ self.__later_time = datetime.datetime.now()
+ self.__difference = self.__later_time - self.__first_time
+ return self.__difference.total_seconds()
+
+ def getTime(self):
+ return self.__difference.total_seconds()
+
+
+global logObj
+logObj = ISEE_Logger(logging.INFO)
+
diff --git a/test-cli/test/helpers/iw.py b/test-cli/test/helpers/iw.py
new file mode 100644
index 0000000..8e29448
--- /dev/null
+++ b/test-cli/test/helpers/iw.py
@@ -0,0 +1,124 @@
+from sh import iw
+from scanf import scanf
+
+class iw_scan:
+ __interface = None
+ __raw_unparsed = []
+ __raw = None
+
+ def __init__(self, interface='wlan0'):
+ self.__interface = interface
+
+ def scan(self):
+ self.__raw_unparsed = []
+ self.__raw = iw('{}'.format(self.__interface), 'scan')
+ if self.__raw.exit_code == 0:
+ self.__raw_unparsed = self.__raw.stdout.decode("utf-8").split('\n')
+ return True
+ return False
+
+ def getRawData (self):
+ return self.__raw_unparsed
+
+ def getInterface(self):
+ return self.__interface
+
+ def getRawObject(self):
+ return self.__raw
+
+
+class iwScan:
+ __iw_scan = None
+ __station_list = {}
+
+ def __init__(self, interface='wlan0'):
+ self.__iw_scan = iw_scan(interface)
+
+ def scan(self):
+ if self.__iw_scan.scan():
+ self.__parse_scan(self.__iw_scan.getRawData())
+ return True
+ return False
+
+ def getLastError(self):
+ rObject = self.__iw_scan.getRawObject()
+ if rObject.exit_code != 0:
+ return rObject.stderr
+ return ''
+
+ def __Parse_BBS(self, line):
+ data = {}
+ res = scanf("BSS %s(on %s) -- %s", line)
+ if res == None:
+ res = scanf("BSS %s(on %s)", line)
+ data["MAC"] = res[0]
+ data["DEV"] = res[1]
+ if len(res) == 3:
+ data["ASSOCIATED"] = 'YES'
+ else:
+ data["ASSOCIATED"] = 'NO'
+ return data
+
+ def __Parse_T(self, id, OnNotFoundAttr ,line):
+ data = {}
+ res = scanf("{}: %s".format(id), line)
+ if res == None:
+ data[id.upper()] = OnNotFoundAttr
+ else:
+ data[id.upper()] = res[0]
+ return data
+
+ def __Parse_X(self, line):
+ data = {}
+ res = scanf("DS Parameter set: channel %s", line)
+ if res == None:
+ data['CHANNEL'] = '-1'
+ else:
+ data['CHANNEL'] = res[0]
+ return data
+
+ def __block_stationlist(self, rawdata):
+ list = {}
+ count = 0
+ for line in rawdata:
+ idx = line.find('BSS')
+ if idx != -1 and idx == 0:
+ if len(list) > 0:
+ count += 1
+ self.__station_list[count] = list
+ list = self.__Parse_BBS(line)
+ elif line.find('SSID:') != -1:
+ list = {**list, **self.__Parse_T('SSID', 'HIDDEN', line)}
+ elif line.find('freq:') != -1:
+ list = {**list, **self.__Parse_T('freq', '', line)}
+ elif line.find('signal:') != -1:
+ list = {**list, **self.__Parse_T('signal', '', line)}
+ elif line.find('DS Parameter set:') != -1:
+ list = {**list, **self.__Parse_X(line)}
+ if count > 0:
+ count += 1
+ self.__station_list[count] = list
+
+ def __parse_scan(self, rawData):
+ self.__station_list = {}
+ self.__block_stationlist(rawData)
+ #print('{}'.format(self.__station_list))
+ #for i in self.__station_list:
+ # print(self.__station_list[i])
+
+ def findConnected (self):
+ for i in self.__station_list:
+ st = self.__station_list[i]
+ if st.get('ASSOCIATED', 'NO') == 'YES':
+ return True, self.__station_list[i]
+ return False, None
+
+ def findBySSID(self, ssid):
+ for i in self.__station_list:
+ st = self.__station_list[i]
+ if st.get('SSID', '') == ssid:
+ return True, self.__station_list[i]
+ return False, None
+
+ def getStations(self):
+ return self.__station_list \ No newline at end of file
diff --git a/test-cli/test/helpers/md5.py b/test-cli/test/helpers/md5.py
new file mode 100644
index 0000000..9bc9e23
--- /dev/null
+++ b/test-cli/test/helpers/md5.py
@@ -0,0 +1,8 @@
+import hashlib
+
+def md5_file(fname):
+ hash_md5 = hashlib.md5()
+ with open(fname, "rb") as f:
+ for chunk in iter(lambda: f.read(4096), b""):
+ hash_md5.update(chunk)
+ return hash_md5.hexdigest() \ No newline at end of file
diff --git a/test-cli/test/helpers/plc.py b/test-cli/test/helpers/plc.py
new file mode 100644
index 0000000..3b00934
--- /dev/null
+++ b/test-cli/test/helpers/plc.py
@@ -0,0 +1,57 @@
+import sh
+from sh import flashcp
+from sh import flash_erase
+from sh import ErrorReturnCode
+from sh import Command
+from test.helpers.gpio import gpio
+
+
+class dcpPLC(object):
+ __nReset = None
+ __Phy = None
+ __Plc = None
+ __mtd_device = None
+ __factool = None
+ __myConfigTool = None
+
+ def __init__(self, plcfactool, mtd_device):
+ # default save firmware
+ self.__nRest = gpio('75', 'out', '0')
+ self.__nRest = gpio('69', 'out', '0')
+ self.__nRest = gpio('78', 'out', '1')
+ self.__factool = plcfactool
+ self.__mtd_device = mtd_device
+ self.__myConfigTool = Command(self.__factool)
+
+ def setSaveFirmwareMode(self):
+ self.__nRest = gpio('75', 'out', '0')
+ self.__nRest = gpio('69', 'out', '0')
+ self.__nRest = gpio('78', 'out', '1')
+
+ def setBootMode(self):
+ self.__nRest = gpio('78', 'out', '0')
+ self.__nRest = gpio('75', 'out', '1')
+ self.__nRest = gpio('69', 'out', '1')
+
+ def SaveFirmware(self, firmare):
+ self.setSaveFirmwareMode()
+ try:
+ flash_erase(self.__mtd_device)
+ flashcp(firmare, self.__mtd_device)
+ except ErrorReturnCode as Error:
+ return False, "plc flash firmware failed {} ".format(Error.exit_code)
+ return True, ''
+
+ def set_plc (self, var, value, password):
+ try:
+ self.__myConfigTool("-o", "SET", "-p", "{}".format(var), '{}'.format(value), "-w", "{}".format(password))
+ except ErrorReturnCode as Error:
+ return False, "set var failed {} {}".format(var,Error.exit_code)
+ return True, ''
+
+ def get_plc (self, var, value, password):
+ try:
+ self.__myConfigTool("-o", "GET", "-p", "{}".format(var), '{}'.format(value), "-w", "{}".format(password))
+ except ErrorReturnCode as Error:
+ return False, "set var failed {} {}".format(var,Error.exit_code)
+ return True, ''
diff --git a/test-cli/test/helpers/psqldb.py b/test-cli/test/helpers/psqldb.py
index c98338a..e703c3a 100644
--- a/test-cli/test/helpers/psqldb.py
+++ b/test-cli/test/helpers/psqldb.py
@@ -31,6 +31,9 @@ class PgSQLConnection(object):
print(error)
return result
+ def getConfig(self):
+ return self.__db_config
+
def db_execute_query(self, query):
cur = self.__conection_object.cursor()
cur.execute(query)
diff --git a/test-cli/test/helpers/qrreader.py b/test-cli/test/helpers/qrreader.py
index 744db54..f663e99 100644
--- a/test-cli/test/helpers/qrreader.py
+++ b/test-cli/test/helpers/qrreader.py
@@ -4,6 +4,7 @@ import threading
import time
import selectors
from selectors import DefaultSelector, EVENT_READ
+from test.helpers.iseelogger import logObj
selector = selectors.DefaultSelector()
@@ -37,7 +38,9 @@ class QRReader:
def getQRlist(self):
devices = [evdev.InputDevice(path) for path in evdev.list_devices()]
+ logObj.getlogger().debug("{}".format(devices))
for device in devices:
+ logObj.getlogger().debug("{}".format(device.name))
if device.name in qrdevice_list:
self.__myReader['NAME'] = device.name
# print(self.__myReader['NAME'])
diff --git a/test-cli/test/helpers/sdl.py b/test-cli/test/helpers/sdl.py
new file mode 100644
index 0000000..8f55d9f
--- /dev/null
+++ b/test-cli/test/helpers/sdl.py
@@ -0,0 +1,126 @@
+import sdl2.ext
+from sdl2 import *
+import os
+import time
+
+Colors = {
+ 'red': sdl2.ext.Color(255, 0, 0, 255),
+ 'green': sdl2.ext.Color(0, 255, 0, 255),
+ 'blue': sdl2.ext.Color(0, 0, 255, 255),
+ 'white': sdl2.ext.Color(255, 255, 255, 255),
+ 'black': sdl2.ext.Color(0, 0, 0, 255),
+}
+
+
+class SDL2(object):
+ __resources = None
+ __w = 1024
+ __h = 768
+ __winName = "noname"
+ __window = None
+ __factory = None
+ __renderer = None
+ __srender = None
+
+ def __init__(self, parent):
+ os.environ['SDL_VIDEODRIVER'] = "x11"
+ os.environ['DISPLAY'] = ":0"
+ self.__resources = sdl2.ext.Resources(parent.getAppPath(), 'files')
+ sdl2.ext.init()
+ self.__createWindow()
+
+ def __createWindow(self):
+ self.__window = sdl2.ext.Window(title='{}'.format(self.__winName), size=(self.__w, self.__h), position=(0, 0),
+ flags=(
+ SDL_WINDOW_HIDDEN | SDL_WINDOW_FULLSCREEN | SDL_WINDOW_BORDERLESS | SDL_WINDOW_MAXIMIZED))
+ self.__renderer = sdl2.ext.Renderer(self.__window)
+ self.__factory = sdl2.ext.SpriteFactory(sdl2.ext.TEXTURE, renderer=self.__renderer)
+ self.__srender = self.__factory.create_sprite_render_system(self.__window)
+ self.__window.show()
+ SDL_ShowCursor(SDL_DISABLE)
+
+ def createFont(self, fontName, fontSize, fontColor):
+ return sdl2.ext.FontManager(font_path=self.__resources.get_path(fontName), size=fontSize, color=fontColor)
+
+ def WriteText(self, text, x, y, fontManager):
+ imText = self.__factory.from_text(text, fontmanager=fontManager)
+ self.__renderer.copy(imText, dstrect=(x, y, imText.size[0], imText.size[1]))
+
+ def show(self):
+ self.__window.show()
+
+ def hide(self):
+ self.__window.hide()
+
+ def setColor(self, red, green, blue, alpha):
+ self.__renderer.color = sdl2.ext.Color(red, green, blue, alpha)
+
+ def fillColor(self, red, green, blue, alpha):
+ self.setColor(red, green, blue, alpha)
+ self.__renderer.clear()
+
+ def fillbgColor(self, color, update=False):
+ if color in Colors:
+ self.__renderer.color = Colors[color]
+ self.__renderer.clear()
+ if update:
+ self.update()
+
+ def update(self):
+ self.__renderer.present()
+
+ def showImage(self, filename):
+ im = self.__factory.from_image(self.__resources.get_path(filename))
+ self.__srender.render(im)
+
+
+class SDL2_Test(object):
+ __sdl2 = None
+
+ def __init__(self, parent):
+ self.__sdl2 = SDL2(parent)
+
+ def Clear(self):
+ self.__sdl2.fillbgColor('black', True)
+
+ def Paint(self, dcolor):
+ self.Clear()
+ self.__sdl2.fillbgColor(dcolor.lower(), True)
+
+
+class SDL2_Message(object):
+ __sdl2 = None
+ __fontManager_w = None
+ __fontManager_b = None
+
+ def __init__(self, parent):
+ self.__sdl2 = SDL2(parent)
+ self.__fontManager_w = self.__sdl2.createFont('OpenSans-Regular.ttf', 24, Colors['white'])
+ self.__fontManager_b = self.__sdl2.createFont('OpenSans-Regular.ttf', 24, Colors['black'])
+
+ def Clear(self):
+ self.__sdl2.fillbgColor('black', True)
+
+ def Paint(self, dcolor):
+ self.Clear()
+ self.__sdl2.fillbgColor(dcolor.lower(), True)
+
+ def __write_w(self, x, y, text):
+ self.__sdl2.WriteText(text, x, y, self.__fontManager_w)
+
+ def __write_b(self, x, y, text):
+ self.__sdl2.WriteText(text, x, y, self.__fontManager_b)
+
+ def setTestOK(self, board_uuid, qrdata):
+ self.Paint('GREEN')
+ self.__write_b(100, 50, 'TEST OK')
+ self.__write_b(100, 100, 'uuid={}'.format(board_uuid))
+ self.__write_b(100, 150, 'qr={}'.format(qrdata))
+ self.__sdl2.update()
+
+ def setTestERROR(self, error):
+ self.Paint('RED')
+ self.__write_w(100, 50, 'TEST FAILED')
+ self.__write_w(100, 100, '{}'.format(error))
+ self.__sdl2.update()
+
diff --git a/test-cli/test/helpers/setup_xml.py b/test-cli/test/helpers/setup_xml.py
index 603a2ac..d61d1fb 100644
--- a/test-cli/test/helpers/setup_xml.py
+++ b/test-cli/test/helpers/setup_xml.py
@@ -1,6 +1,5 @@
import xml.etree.ElementTree as XMLParser
-
class XMLSetup (object):
"""XML Setup Parser"""
__tree = None # Parser
diff --git a/test-cli/test/helpers/testsrv_db.py b/test-cli/test/helpers/testsrv_db.py
index 9579e7e..c2b2276 100644
--- a/test-cli/test/helpers/testsrv_db.py
+++ b/test-cli/test/helpers/testsrv_db.py
@@ -24,6 +24,9 @@ class TestSrv_Database(object):
self.__sqlObject = PgSQLConnection()
return self.__sqlObject.db_connect(self.__xml_setup.getdbConnectionStr())
+ def getConfig(self):
+ return self.__sqlObject.getConfig()
+
def create_board(self, processor_id, model_id, variant, station, bmac=None):
'''create a new board'''
if bmac is None:
@@ -41,7 +44,8 @@ class TestSrv_Database(object):
return res[0][0]
except Exception as err:
r = find_between(str(err), '#', '#')
- # print(r)
+ print(r)
+ print(str(err))
return None
def get_tests_list(self, board_uuid):
@@ -95,10 +99,11 @@ class TestSrv_Database(object):
sql = "SELECT isee.f_finish_test({},{},'{}','{}')".format(testid_ctl, testid, newstatus, textresult)
# print('>>>' + sql)
try:
+ print('finish_test => SQL {}'.format(sql))
self.__sqlObject.db_execute_query(sql)
except Exception as err:
r = find_between(str(err), '#', '#')
- # print(r)
+ print('finish_test => {}'.format(err))
return None
def upload_result_file(self, testid_ctl, testid, desc, filepath, mimetype):
@@ -195,6 +200,18 @@ class TestSrv_Database(object):
def change_station_state(self, station, newstate):
sql = "SELECT station.setmystate('{}', '{}', NULL)".format(newstate, station)
+ print('>>>' + sql)
+ try:
+ res = self.__sqlObject.db_execute_query(sql)
+ print(res)
+ return res[0][0]
+ except Exception as err:
+ r = find_between(str(err), '#', '#')
+ print(r)
+ return None
+
+ def get_setup_variable(self, skey):
+ sql = "SELECT * FROM admin.get_setupvar('{}')".format(skey)
# print('>>>' + sql)
try:
res = self.__sqlObject.db_execute_query(sql)
@@ -205,7 +222,7 @@ class TestSrv_Database(object):
# print(r)
return None
- def get_setup_variable(self, skey):
+ def get_setup_variable(self, skey , default):
sql = "SELECT * FROM admin.get_setupvar('{}')".format(skey)
# print('>>>' + sql)
try:
@@ -215,4 +232,4 @@ class TestSrv_Database(object):
except Exception as err:
r = find_between(str(err), '#', '#')
# print(r)
- return None
+ return default
diff --git a/test-cli/test/helpers/utils.py b/test-cli/test/helpers/utils.py
new file mode 100644
index 0000000..8d2c27b
--- /dev/null
+++ b/test-cli/test/helpers/utils.py
@@ -0,0 +1,68 @@
+import json
+import socket
+import os
+import scanf
+
+def save_json_to_disk(filePath, description, mime, json_data, result):
+ with open(filePath, 'w') as outfile:
+ json.dump(json_data, outfile, indent=4)
+ outfile.close()
+ result.append(
+ {
+ "description": description,
+ "filepath": filePath,
+ "mimetype": mime
+ }
+ )
+
+def save_file_to_disk(filePath, description, mime, data, result):
+ with open(filePath, 'w') as outfile:
+ outfile.write(data)
+ outfile.close()
+ result.append(
+ {
+ "description": description,
+ "filepath": filePath,
+ "mimetype": mime
+ }
+ )
+
+def save_result (filePath, description, mime, result):
+ result.append(
+ {
+ "description": description,
+ "filepath": filePath,
+ "mimetype": mime
+ }
+ )
+
+
+def str2bool(v):
+ return v.lower() in ("yes", "true", "t", "1")
+
+def station2Port(base):
+ Name = socket.gethostname()
+ res = scanf.scanf("Station%d", Name)
+ if res[0]:
+ NmBase = int(base)
+ res = int(res[0])
+ return str(NmBase + res)
+ return base
+
+
+def sys_read(sysnode):
+ try:
+ f = open(sysnode, "r")
+ data = f.readline()
+ f.close()
+ except IOError as Error:
+ return False, '{}'.format(Error.errno)
+ return True, data
+
+def removefileIfExist(fname):
+ if os.path.exists(fname):
+ try:
+ os.remove(fname)
+ except OSError as error:
+ return False, str(error)
+ return True, ''
diff --git a/test-cli/test/runners/simple.py b/test-cli/test/runners/simple.py
index 2eb61a4..ece5a4e 100644
--- a/test-cli/test/runners/simple.py
+++ b/test-cli/test/runners/simple.py
@@ -5,6 +5,7 @@ Simple Test Runner for unittest module
import sys
import unittest
+from test.helpers.iseelogger import logObj
class SimpleTestRunner:
@@ -59,28 +60,34 @@ class TextTestResult(unittest.TestResult):
def addError(self, test, err):
unittest.TestResult.addError(self, test, err)
- # test.longMessage = err[1]
+ test.longMessage = err[1]
self.result = self.ERROR
- print(err[1])
+ print(err)
def addFailure(self, test, err):
unittest.TestResult.addFailure(self, test, err)
test.longMessage = err[1]
self.result = self.FAIL
+ logObj.getlogger().info('{}:{}'.format(test, err))
def stopTest(self, test):
- unittest.TestResult.stopTest(self, test)
- # display: print test result
- self.runner.writeUpdate(self.result)
- # save result data
- for result in test.getresults():
- self.__pgObj.upload_result_file(test.params["testidctl"], test.params["testid"],
- result["description"], result["filepath"], result["mimetype"])
- # SEND TO DB THE RESULT OF THE TEST
- if self.result == self.PASS:
- status = "TEST_COMPLETE"
- resulttext = test.gettextresult()
- else:
- status = "TEST_FAILED"
- resulttext = test.longMessage
- self.__pgObj.finish_test(test.params["testidctl"], test.params["testid"], status, resulttext)
+ try:
+ unittest.TestResult.stopTest(self, test)
+ # display: print test result
+ self.runner.writeUpdate(self.result)
+ # save result data
+ for result in test.getresults():
+ self.__pgObj.upload_result_file(test.params["testidctl"], test.params["testid"],
+ result["description"], result["filepath"], result["mimetype"])
+ # SEND TO DB THE RESULT OF THE TEST
+ if self.result == self.PASS:
+ status = "TEST_COMPLETE"
+ resulttext = test.gettextresult()
+ logObj.getlogger().info('{}:{}:{}:{}'.format(test.params["testidctl"], test.params["testid"], status, resulttext))
+ else:
+ status = "TEST_FAILED"
+ resulttext = test.longMessage
+ logObj.getlogger().info('{}:{}:{}:{}'.format(test.params["testidctl"], test.params["testid"], status, resulttext))
+ self.__pgObj.finish_test(test.params["testidctl"], test.params["testid"], status, resulttext)
+ except Exception as E:
+ logObj.getlogger().error('Exception: [{}]{}:{}:{}:{}'.format(E, test.params["testidctl"], test.params["testid"], status, resulttext)) \ No newline at end of file
diff --git a/test-cli/test/tasks/flasheeprom.py b/test-cli/test/tasks/flasheeprom.py
index feeb04c..775c556 100644
--- a/test-cli/test/tasks/flasheeprom.py
+++ b/test-cli/test/tasks/flasheeprom.py
@@ -2,55 +2,76 @@ import os
import binascii
from test.helpers.globalVariables import globalVar
+class EEprom_Flasher:
+ __Name = None
+ __varlist = None
+ __xmlObj = None
+ __lastError = ''
-def _generate_data_bytes(boarduuid, mac0, mac1):
- data = bytearray()
- data += (2029785358).to_bytes(4, 'big') # magicid --> 0x78FC110E
- data += bytearray([0, 0, 0, 0]) # crc32 = 0
- data += bytearray(boarduuid + "\0",
+ def __init__(self, varlist):
+ self.__varlist = varlist
+ self.__xmlObj = varlist['xml']
+ self.__Name = varlist.get('name_eeprom', self.__xmlObj.getKeyVal('EEProm_Flasher', 'name', 'EEProm_Flasher'))
+ # self.__igepflashPath = self.__xmlObj.getKeyVal('NAND_Flasher', 'igep_flash', '/usr/bin/igep-flash')
+ # self.__ImagePath = varlist.get('flashimagepath', '')
+ # self.__SkipNandtest = varlist.get('skipnandtest', self.__xmlObj.getKeyVal('NAND_Flasher', 'skipnandtest', '1'))
+
+ def getTaskName(self):
+ return self.__Name
+
+ def getError(self):
+ return self.__lastError
+
+ def Execute(self):
+ return True, None
+
+ def __generate_data_bytes(self, boarduuid, mac0, mac1):
+ data = bytearray()
+ data += (2029785358).to_bytes(4, 'big') # magicid --> 0x78FC110E
+ data += bytearray([0, 0, 0, 0]) # crc32 = 0
+ data += bytearray(boarduuid + "\0",
'ascii') # uuid --> 'c0846c8a-5fa5-11ea-8576-f8b156ac62d7' and \0 at the end
- data += binascii.unhexlify(mac0.replace(':', '')) # mac0 --> 'f8:b1:56:ac:62:d7'
- if mac1 is not None:
- data += binascii.unhexlify(mac1.replace(':', '')) # mac1 --> 'f8:b1:56:ac:62:d7'
- else:
- data += bytearray([0, 0, 0, 0, 0, 0]) # mac1 --> 0:0:0:0:0:0
- # calculate crc
- crc = (binascii.crc32(data, 0)).to_bytes(4, 'big')
- data[4:8] = crc
- return data
-
-
-def _generate_output_data(data_rx):
- boarduuid = data_rx[8:44].decode('ascii') # no 8:45 porque el \0 final hace que no funcione postgresql
- mac0 = binascii.hexlify(data_rx[45:51]).decode('ascii')
- mac1 = binascii.hexlify(data_rx[51:57]).decode('ascii')
- smac0 = ':'.join(mac0[i:i + 2] for i in range(0, len(mac0), 2))
- smac1 = ':'.join(mac1[i:i + 2] for i in range(0, len(mac1), 2))
- eepromdata = "UUID " + boarduuid + " , MAC0 " + smac0 + " , MAC1 " + smac1
-
- return eepromdata
-
-
-# returns exitcode and data saved into eeprom
-def flash_eeprom(eeprompath, boarduuid, mac0, mac1=None):
- print("Start programming Eeprom...")
- # check if eeprompath is correct
- if os.path.isfile(eeprompath):
- # create u-boot data struct
- data = _generate_data_bytes(boarduuid, mac0, mac1)
- # write into eeprom and read back
- f = open(eeprompath, "r+b")
- f.write(data)
- f.seek(0)
- data_rx = f.read(57)
- for i in range(57):
- if data_rx[i] != data[i]:
- print("Error while programming eeprom memory.")
- return 1, None
- print("Eeprom programmed succesfully.")
- # generated eeprom read data
- eepromdata = _generate_output_data(data_rx)
- return 0, eepromdata
- else:
- print("Eeprom memory not found.")
- return 1, None
+ data += binascii.unhexlify(mac0.replace(':', '')) # mac0 --> 'f8:b1:56:ac:62:d7'
+ if mac1 is not None:
+ data += binascii.unhexlify(mac1.replace(':', '')) # mac1 --> 'f8:b1:56:ac:62:d7'
+ else:
+ data += bytearray([0, 0, 0, 0, 0, 0]) # mac1 --> 0:0:0:0:0:0
+ # calculate crc
+ crc = (binascii.crc32(data, 0)).to_bytes(4, 'big')
+ data[4:8] = crc
+ return data
+
+
+ def __generate_output_data(self, data_rx):
+ boarduuid = data_rx[8:44].decode('ascii') # no 8:45 porque el \0 final hace que no funcione postgresql
+ mac0 = binascii.hexlify(data_rx[45:51]).decode('ascii')
+ mac1 = binascii.hexlify(data_rx[51:57]).decode('ascii')
+ smac0 = ':'.join(mac0[i:i + 2] for i in range(0, len(mac0), 2))
+ smac1 = ':'.join(mac1[i:i + 2] for i in range(0, len(mac1), 2))
+ eepromdata = "UUID " + boarduuid + " , MAC0 " + smac0 + " , MAC1 " + smac1
+ return eepromdata
+
+
+ # returns exitcode and data saved into eeprom
+ def flash_eeprom(self, eeprompath, boarduuid, mac0, mac1=None):
+ print("Start programming Eeprom...")
+ # check if eeprompath is correct
+ if os.path.isfile(eeprompath):
+ # create u-boot data struct
+ data = self.__generate_data_bytes(boarduuid, mac0, mac1)
+ # write into eeprom and read back
+ f = open(eeprompath, "r+b")
+ f.write(data)
+ f.seek(0)
+ data_rx = f.read(57)
+ for i in range(57):
+ if data_rx[i] != data[i]:
+ print("Error while programming eeprom memory.")
+ return 1, None
+ print("Eeprom programmed succesfully.")
+ # generated eeprom read data
+ eepromdata = self.__generate_output_data(data_rx)
+ return 0, eepromdata
+ else:
+ print("Eeprom memory not found.")
+ return 1, None
diff --git a/test-cli/test/tasks/flashmemory.py b/test-cli/test/tasks/flashmemory.py
index 3be56d7..ad9d92a 100644
--- a/test-cli/test/tasks/flashmemory.py
+++ b/test-cli/test/tasks/flashmemory.py
@@ -1,12 +1,44 @@
import sh
+import os
+class NAND_Flasher:
+ __Name = None
+ __varlist = None
+ __xmlObj = None
+ __igepflashPath = None
+ __lastError = ''
+
+ def __init__(self, varlist):
+ self.__varlist = varlist
+ self.__xmlObj = varlist['xml']
+ self.__Name = varlist.get('name_flashnand', self.__xmlObj.getKeyVal('NAND_Flasher', 'name', 'NAND_Flasher'))
+ self.__igepflashPath = self.__xmlObj.getKeyVal('NAND_Flasher', 'igep_flash', '/usr/bin/igep-flash')
+ self.__ImagePath = varlist.get('flashimagepath', '')
+ self.__SkipNandtest = varlist.get('skipnandtest', self.__xmlObj.getKeyVal('NAND_Flasher', 'skipnandtest', '1'))
+
+ def getTaskName(self):
+ return self.__Name
+
+ def getError(self):
+ return self.__lastError
+
+ def Execute(self):
+ try:
+ self.__lastError = ''
+ if not os.path.isfile(self.__ImagePath):
+ self.__lastError('Not Image Found: {}'.format(self.__ImagePath))
+ return False, None
+ if self.__SkipNandtest == '1':
+ p = sh.bash('{}'.format(self.__igepflashPath), "--skip-nandtest", "--image", self.__ImagePath)
+ else:
+ p = sh.bash('{}'.format(self.__igepflashPath), "--image", self.__ImagePath)
+ if p.exit_code != 0:
+ return False, None
+ except OSError as e:
+ self.__lastError('Exception: {}'.format(e.strerror))
+ return False, None
+ except Exception as Error:
+ self.__lastError('Exception: {}'.format(Error))
+ return False, None
+ return True, None
-def flash_memory(imagepath):
- print("Start programming Nand memory...")
- p = sh.bash("/usr/bin/igep-flash", "--skip-nandtest", "--image", imagepath)
- if p.exit_code != 0:
- print("Flasher: Could not complete flashing task.")
- return 1
- else:
- print("Flasher: NAND memory flashed succesfully.")
- return 0
diff --git a/test-cli/test/tests/qamper.py b/test-cli/test/tests/qamper.py
index 58054c9..df340eb 100644
--- a/test-cli/test/tests/qamper.py
+++ b/test-cli/test/tests/qamper.py
@@ -5,6 +5,7 @@ from test.helpers.amper import Amper
class Qamper(unittest.TestCase):
params = None
__resultlist = None # resultlist is a python list of python dictionaries
+ dummytest = False
# varlist: undercurrent, overcurrent
def __init__(self, testname, testfunc, varlist):
@@ -21,8 +22,28 @@ class Qamper(unittest.TestCase):
raise Exception('overcurrent param inside Qamp must be defined')
self._testMethodDoc = testname
self.__resultlist = []
+ if "dummytest" in varlist:
+ self.dummytest = varlist["dummytest"]
+
+ def saveresultinfile(self, currentvalue):
+ # save result in a file
+ with open('/mnt/station_ramdisk/amper.txt', 'w') as outfile:
+ n = outfile.write("Current: {} A".format(currentvalue))
+ outfile.close()
+ self.__resultlist.append(
+ {
+ "description": "Amperimeter values",
+ "filepath": "/mnt/station_ramdisk/amper.txt",
+ "mimetype": "text/plain"
+ }
+ )
def execute(self):
+ if self.dummytest:
+ self.current = "0.0"
+ self.saveresultinfile(self.current)
+ return
+
amp = Amper()
# open serial connection
if not amp.open():
@@ -35,16 +56,7 @@ class Qamper(unittest.TestCase):
# get current value (in Amperes)
self.current = amp.getCurrent()
# save result in a file
- with open('/mnt/station_ramdisk/amper.txt', 'w') as outfile:
- n = outfile.write("Current: {} A".format(self.current))
- outfile.close()
- self.__resultlist.append(
- {
- "description": "Amperimeter values",
- "filepath": "/mnt/station_ramdisk/amper.txt",
- "mimetype": "text/plain"
- }
- )
+ self.saveresultinfile(self.current)
# close serial connection
amp.close()
# Check current range
diff --git a/test-cli/test/tests/qaudio.py b/test-cli/test/tests/qaudio.py
index 3d2113a..2c86809 100644
--- a/test-cli/test/tests/qaudio.py
+++ b/test-cli/test/tests/qaudio.py
@@ -2,56 +2,78 @@ import unittest
import sh
import wave
import contextlib
-
-
-def calc_audio_duration(fname):
- with contextlib.closing(wave.open(fname, 'r')) as f:
- frames = f.getnframes()
- rate = f.getframerate()
- duration = frames / float(rate)
- f.close()
- return duration
-
+import uuid
+import time
+from test.helpers.utils import save_result
+from test.helpers.iseelogger import logObj
class Qaudio(unittest.TestCase):
params = None
__resultlist = None # resultlist is a python list of python dictionaries
__dtmf_secuence = ["1", "3", "5", "7", "9"]
+ __dtmf_file = None
+ __file_uuid = None
+ __QaudioName = None
def __init__(self, testname, testfunc, varlist):
self.params = varlist
super(Qaudio, self).__init__(testfunc)
self._testMethodDoc = testname
self.__resultlist = []
+ self.__xmlObj = varlist["xml"]
+ self.__QaudioName = varlist.get('name', 'qaudio')
+ self.__dtmf_file_play = varlist.get('play_dtmf_file', self.__xmlObj.getKeyVal(self.__QaudioName, "play_dtmf_file", "dtmf-13579.wav"))
+ self.__recordtime = varlist.get('recordtime', self.__xmlObj.getKeyVal(self.__QaudioName, "recordtime", "0.15"))
+ self.__dtmf_file_record = varlist.get('record_dtmf_file', self.__xmlObj.getKeyVal(self.__QaudioName, "record_dtmf_file", "record-{}.wav"))
+ self.__sampling_rate = varlist.get('rate', self.__xmlObj.getKeyVal(self.__QaudioName, "rate", "8000"))
+ self.__record_path = varlist.get('to', self.__xmlObj.getKeyVal(self.__QaudioName, "to", "/mnt/station_ramdisk"))
+ self.__run_delay = float(varlist.get('aplay_run_delay', self.__xmlObj.getKeyVal(self.__QaudioName, "aplay_run_delay", "0.0")))
+ self.__file_uuid = uuid.uuid4()
+ self.__dtmf_file_record = self.__dtmf_file_record.format(self.__file_uuid)
+
+ def restoreAlsaCtl(self):
+ try:
+ sh.alsactl('restore');
+ except Exception as E:
+ logObj.getlogger().error("Failed to Execite alsactl restore");
def execute(self):
- # analize audio file
- recordtime = calc_audio_duration("/var/lib/hwtest-files/dtmf-13579.wav") + 0.15
+ self.restoreAlsaCtl()
+ try:
+ # analize audio file
+ calcRecordTime = self.calc_audio_duration(self.__dtmf_file_play) + float(self.__recordtime) + float(0.1)
+ except TypeError as Error:
+ self.fail("failed: TypeError: {}.".Error)
+
# previous play to estabilise audio lines
- p1 = sh.aplay("/var/lib/hwtest-files/dtmf-13579.wav", _bg=True)
- p2 = sh.arecord("-r", 8000, "-d", recordtime, "/mnt/station_ramdisk/recorded.wav", _bg=True)
- p1.wait()
- p2.wait()
- # play and record
- p1 = sh.aplay("/var/lib/hwtest-files/dtmf-13579.wav", _bg=True)
- p2 = sh.arecord("-r", 8000, "-d", recordtime, "/mnt/station_ramdisk/recorded.wav", _bg=True)
- p1.wait()
+ p2 = sh.arecord("-r", self.__sampling_rate, "-d", calcRecordTime, "{}/{}".format(self.__record_path, self.__dtmf_file_record), _bg=True)
+ time.sleep(self.__run_delay)
+ p1 = sh.aplay(self.__dtmf_file_play)
p2.wait()
if p1.exit_code == 0 and p2.exit_code == 0:
- p = sh.multimon("-t", "wav", "-a", "DTMF", "/mnt/station_ramdisk/recorded.wav", "-q")
+ p = sh.multimon("-t", "wav", "-a", "DTMF", "{}/{}".format(self.__record_path, self.__dtmf_file_record), "-q")
if p.exit_code == 0:
lines = p.stdout.decode('ascii').splitlines()
self.__dtmf_secuence_result = []
for li in lines:
if li.split(" ")[0] == "DTMF:":
self.__dtmf_secuence_result.append(li.split(" ")[1])
+ self.__dtmf_secuence_result = sorted(list(set(self.__dtmf_secuence_result)))
# compare original and processed dtmf sequence
if len(self.__dtmf_secuence) == len(self.__dtmf_secuence_result):
for a, b in zip(self.__dtmf_secuence, self.__dtmf_secuence_result):
if a != b:
- self.fail("failed: sent and received DTMF sequence don't match.")
+ logObj.getlogger().debug(
+ "failed: sent and received DTMF sequence do not match.{} != {}".format(
+ self.__dtmf_secuence, self.__dtmf_secuence_result))
+ self.fail("failed: sent and received DTMF sequence do not match.")
+ save_result(filePath='{}/{}'.format(self.__record_path, self.__dtmf_file_record),
+ description='sound record',
+ mime='audio/x-wav',
+ result=self.__resultlist)
else:
- self.fail("failed: received DTMF sequence is shorter than expected.")
+ logObj.getlogger().debug("received DTMF sequence is shorter than expected.{} {}".format(self.__dtmf_secuence,self.__dtmf_secuence_result))
+ self.fail("received DTMF sequence is shorter than expected")
else:
self.fail("failed: unable to use multimon command.")
else:
@@ -62,3 +84,12 @@ class Qaudio(unittest.TestCase):
def gettextresult(self):
return ""
+
+ def calc_audio_duration(self, fname):
+ duration = 0.0
+ with contextlib.closing(wave.open(fname, 'r')) as f:
+ frames = f.getnframes()
+ rate = f.getframerate()
+ duration = frames / float(rate)
+ f.close()
+ return duration
diff --git a/test-cli/test/tests/qdmesg.py b/test-cli/test/tests/qdmesg.py
index b35f1ff..39a5047 100644
--- a/test-cli/test/tests/qdmesg.py
+++ b/test-cli/test/tests/qdmesg.py
@@ -1,4 +1,5 @@
import sh
+import os
import os.path
from os import path
@@ -9,24 +10,23 @@ class Qdmesg:
def __init__(self, testname, testfunc, varlist):
self.params = varlist
- self._testMethodDoc = testname
+ self.__testMethodDoc = testname
self.__resultlist = []
self.pgObj = varlist["db"]
+ self.__xmlObj = varlist["xml"]
+ self.__QdmesgName = varlist.get('name', 'qdmesg')
+ self.__syslog_dmesg_file = varlist.get('syslog_dmesg',self.__xmlObj.getKeyVal(self.__QdmesgName, "syslog_dmesg",
+ "/var/log/kern.log"))
+
+ def getTestName(self):
+ return self.__testMethodDoc
def execute(self):
- print("running dmesg test")
self.pgObj.run_test(self.params["testidctl"], self.params["testid"])
- # delete previous file
- if path.exists("/mnt/station_ramdisk/dmesg.txt"):
- os.remove("/mnt/station_ramdisk/dmesg.txt")
- # generate file
- p = sh.dmesg("--color=never", _out="/mnt/station_ramdisk/dmesg.txt")
- if p.exit_code == 0:
- # save dmesg result in DB
- self.pgObj.upload_result_file(self.params["testidctl"], self.params["testid"], "dmesg output",
- "/mnt/station_ramdisk/dmesg.txt", "text/plain")
- self.pgObj.finish_test(self.params["testidctl"], self.params["testid"], "TEST_COMPLETE", "")
- print("success dmesg test")
- else:
+ if not os.path.isfile('{}'.format(self.__syslog_dmesg_file)):
self.pgObj.finish_test(self.params["testidctl"], self.params["testid"], "TEST_FAILED", "")
- print("fail dmesg test")
+ return False
+ self.pgObj.upload_result_file(self.params["testidctl"], self.params["testid"], "{}".format(self.__syslog_dmesg_file),
+ "{}".format(self.__syslog_dmesg_file), "text/plain")
+ self.pgObj.finish_test(self.params["testidctl"], self.params["testid"], "TEST_COMPLETE", "")
+ return True
diff --git a/test-cli/test/tests/qeeprom.py b/test-cli/test/tests/qeeprom.py
index 7fc9fcd..7900381 100644
--- a/test-cli/test/tests/qeeprom.py
+++ b/test-cli/test/tests/qeeprom.py
@@ -1,46 +1,65 @@
-import sh
import unittest
-
+import os
class Qeeprom(unittest.TestCase):
params = None
- __position = None
- __eeprompath = None
__resultlist = None # resultlist is a python list of python dictionaries
+ __QEepromName = None
+ __i2cBus = None
+ __device = None
# varlist: position, eeprompath
def __init__(self, testname, testfunc, varlist):
self.params = varlist
super(Qeeprom, self).__init__(testfunc)
self._testMethodDoc = testname
+ self.__xmlObj = varlist["xml"]
+ self.__QEepromName = varlist.get('name', 'qeeprom')
+ self.__i2cAddress = varlist.get('i2c_address', self.__xmlObj.getKeyVal(self.__QEepromName, "i2c_address", "0050"))
+ self.__i2cBus = varlist.get('i2c_bus', self.__xmlObj.getKeyVal(self.__QEepromName, "i2c_bus", "0"))
+ self.__device = '/sys/bus/i2c/devices/{}-{}/eeprom'.format(self.__i2cBus, self.__i2cAddress)
+
self.__resultlist = []
- if "position" in varlist:
- self.__position = varlist["position"]
- else:
- raise Exception('position param inside Qeeprom must be defined')
- if "eeprompath" in varlist:
- self.__eeprompath = varlist["eeprompath"]
- else:
- raise Exception('eeprompath param inside Qeeprom must be defined')
+
+ def __check_device(self):
+ if os.path.isfile(self.__device):
+ return True
+ return False
+
+ def __eeprom_write(self, data):
+ try:
+ f = open(self.__device, "wb")
+ f.write(data)
+ f.close()
+ except IOError as Error:
+ return False, '{}'.format(Error.errno)
+ return True, ''
+
+ def __eeprom_read(self, size):
+ try:
+ f = open(self.__device, "rb")
+ data = f.read(size)
+ f.close()
+ except IOError as Error:
+ return False, '{}'.format(Error.errno)
+ return True, data
+
def execute(self):
- # generate some data
- data_tx = "isee_test"
- # write data into the eeprom
- p = sh.dd(sh.echo(data_tx), "of=" + self.__eeprompath, "bs=1", "seek=" + self.__position)
- if p.exit_code == 0:
- # read data from the eeprom
- p = sh.dd("if=" + self.__eeprompath, "bs=1", "skip=" + self.__position, "count=" + str(len(data_tx)))
- if p.exit_code == 0:
- data_rx = p.stdout.decode('ascii')
- # compare both values
- if data_rx != data_tx:
- # Both data are different
- self.fail("failed: mismatch between written and received values.")
- else:
- self.fail("failed: Unable to read from the EEPROM device.")
- else:
- self.fail("failed: Unable to write on the EEPROM device.")
+ if not self.__check_device():
+ self.fail("cannot open device {}".format(self.__device))
+
+ data = bytes('AbcDeFgHijK098521', 'utf-8')
+
+ v, w = self.__eeprom_write(data)
+ if not v:
+ self.fail("eeprom: {} write test failed".format(w))
+ v, r = self.__eeprom_read(len(data))
+ if not v:
+ self.fail("eeprom: {} read test failed".format(r))
+ if r != data:
+ self.fail("mismatch between write and read bytes")
+
def getresults(self):
return self.__resultlist
diff --git a/test-cli/test/tests/qethernet.py b/test-cli/test/tests/qethernet.py
index 81acef1..7d081a1 100644
--- a/test-cli/test/tests/qethernet.py
+++ b/test-cli/test/tests/qethernet.py
@@ -2,7 +2,11 @@ import unittest
import sh
import json
import time
-
+import uuid
+import iperf3
+import netifaces
+from test.helpers.utils import save_json_to_disk
+from test.helpers.utils import station2Port
class Qethernet(unittest.TestCase):
__serverip = None
@@ -16,29 +20,66 @@ class Qethernet(unittest.TestCase):
# varlist content: serverip, bwexpected, port
def __init__(self, testname, testfunc, varlist):
- # save the parameters list
self.params = varlist
- # configure the function to be executed when the test runs. "testfunc" is a name of a method inside this
- # class, that in this situation, it can only be "execute".
super(Qethernet, self).__init__(testfunc)
- # validate and get the parameters
- if "serverip" in varlist:
- self.__serverip = varlist["serverip"]
- else:
- raise Exception('sip param inside Qethernet have been be defined')
- if "bwexpected" in varlist:
- self.__bwexpected = varlist["bwexpected"]
- else:
- raise Exception('bwexpected param inside Qethernet must be defined')
- if "port" in varlist:
- self.__port = varlist["port"]
- else:
- raise Exception('port param inside Qethernet must be defined')
- self.__numbytestx = "10M"
self._testMethodDoc = testname
+ self.__xmlObj = varlist["xml"]
+ self.__QEthName = varlist.get('name', 'qeth')
+ self.__loops = varlist.get('loops', self.__xmlObj.getKeyVal(self.__QEthName, "loops", "1"))
+ self.__port = varlist.get('port', self.__xmlObj.getKeyVal(self.__QEthName, "port", "5000"))
+ self.__bw = varlist.get('bwexpected', self.__xmlObj.getKeyVal(self.__QEthName, "bwexpected", "40.0"))
+ self.__serverip = varlist.get('serverip', self.__xmlObj.getKeyVal(self.__QEthName, "serverip", "localhost"))
+ self.__duration = varlist.get('duration', self.__xmlObj.getKeyVal(self.__QEthName, "duration", "10"))
+ self.__interface = varlist.get('interface', self.__xmlObj.getKeyVal(self.__QEthName, "interface", "eth0"))
+ self.__toPath = varlist.get('to', self.__xmlObj.getKeyVal(self.__QEthName, "to", "/mnt/station_ramdisk"))
+ self.__retriesCount = varlist.get('retries', self.__xmlObj.getKeyVal(self.__QEthName, "retries", "5"))
+ self.__waitRetryTime = varlist.get('wait_retry', self.__xmlObj.getKeyVal(self.__QEthName, "wait_retry", "10"))
+ self.__wifi_res_file = varlist.get('eth_res_file', self.__xmlObj.getKeyVal(self.__QEthName, "eth_res_file", "eth_test_{}.json"))
+ self.__wifi_st_file = varlist.get('eth_st_file', self.__xmlObj.getKeyVal(self.__QEthName, "eth_st_file", "eth_st_{}.json"))
+ self.__file_uuid = uuid.uuid4()
+ self.__wifi_res_file = self.__wifi_res_file.format(self.__file_uuid)
+ self.__wifi_st_file = self.__wifi_st_file.format(self.__file_uuid)
+
self.__resultlist = []
+ def checkInterface(self, reqInterface):
+ ifaces = netifaces.interfaces()
+ for t in ifaces:
+ if t == reqInterface:
+ return True
+ return False
+
def execute(self):
+ self.__resultlist = []
+ # check interfaces
+ if not self.checkInterface(self.__interface):
+ self.fail('Interface not found: {}'.format(self.__interface))
+
+ # Start Iperf
+ client = iperf3.Client()
+ client.duration = int(self.__duration)
+ client.server_hostname = self.__serverip
+ client.port = station2Port(self.__port)
+ count = int(self.__retriesCount)
+ while count > 0:
+ result = client.run()
+ if result.error == None:
+ if result.received_Mbps >= float(self.__bw) and result.sent_Mbps >= float(self.__bw):
+ save_json_to_disk(filePath='{}/{}'.format(self.__toPath, self.__wifi_res_file),
+ description='eth {} iperf3'.format(self.__interface),
+ mime='application/json',
+ json_data=result.json,
+ result=self.__resultlist)
+ return
+ else:
+ self.fail('bw fail: rx:{} tx:{}'.format(result.received_Mbps, result.sent_Mbps))
+ else:
+ count = count - 1
+ time.sleep(int(self.__waitRetryTime))
+
+ self.fail('qEth (max tries) Execution error: {}'.format(result.error))
+
+ def execute2(self):
# execute iperf command against the server, but it implements attempts in case the server is busy
iperfdone = False
while not iperfdone:
diff --git a/test-cli/test/tests/qmmcflash.py b/test-cli/test/tests/qmmcflash.py
index f10757e..0f5a0c1 100644
--- a/test-cli/test/tests/qmmcflash.py
+++ b/test-cli/test/tests/qmmcflash.py
@@ -1,5 +1,10 @@
import unittest
-import sh
+import scanf
+from scanf import scanf
+from sh import mmc
+from sh import ErrorReturnCode
+from test.helpers.utils import save_file_to_disk
+from test.helpers.utils import sys_read
class Qmmcflash(unittest.TestCase):
params = None
@@ -8,8 +13,43 @@ class Qmmcflash(unittest.TestCase):
self.params = varlist
super(Qmmcflash, self).__init__(testfunc)
self._testMethodDoc = testname
- # TODO
+ self.__xmlObj = varlist["xml"]
+ self.__QeMMCName = varlist.get('name', 'qemmc')
+ self.__emmcDevice = varlist.get('device', self.__xmlObj.getKeyVal(self.__QeMMCName, "device", "mmcblk0"))
+ self.__toPath = varlist.get('to', self.__xmlObj.getKeyVal(self.__QeMMCName, "to", "/mnt/station_ramdisk"))
+ self.__mmc_res_file = varlist.get('emmc_res_file', self.__xmlObj.getKeyVal(self.__QeMMCName, "emmc_res_file", "emmc_status.txt"))
+ self.__mmcPort = varlist.get('mmc_port', self.__xmlObj.getKeyVal(self.__QeMMCName, "mmc_port", "0"))
+ self.__mmcID = varlist.get('mmc_id', self.__xmlObj.getKeyVal(self.__QeMMCName, "mmc_id", "0001"))
+
+ self.__resultlist = []
def execute(self):
- # TODO \ No newline at end of file
+ try:
+ dataOut = mmc('extcsd', 'read', '/dev/{}'.format(self.__emmcDevice))
+ save_file_to_disk(filePath='{}/{}'.format(self.__toPath, self.__mmc_res_file),
+ description='eMMC health test',
+ mime='text/plain',
+ data=dataOut.stdout.decode('utf-8'),
+ result=self.__resultlist)
+ sysDevice = "/sys/bus/mmc/drivers/mmcblk/mmc{}:{}".format(self.__mmcPort, self.__mmcID)
+ r, data = sys_read("{}/life_time".format(sysDevice))
+ if not r:
+ self.fail("emmc: life_time not found")
+ res = scanf("0x%d 0x%d", data)
+ if res[0] > 3 or res[1] > 3:
+ self.fail("emmc: review {} life_time > 3".format(sysDevice))
+ r, data = sys_read("{}/pre_eol_info".format(sysDevice))
+ if not r:
+ self.fail("emmc: pre_eol_info not found")
+ res = scanf("0x%d", data)
+ if res[0] != 1:
+ self.fail("emmc: review {} pre_eol_info != 1".format(sysDevice))
+ except ErrorReturnCode as Error:
+ self.fail("emmc: failed {} ".format(Error.exit_code))
+
+ def getresults(self):
+ return self.__resultlist
+
+ def gettextresult(self):
+ return "" \ No newline at end of file
diff --git a/test-cli/test/tests/qnand.py b/test-cli/test/tests/qnand.py
index 988ab60..6de1eaf 100644
--- a/test-cli/test/tests/qnand.py
+++ b/test-cli/test/tests/qnand.py
@@ -1,6 +1,7 @@
import unittest
import sh
-
+import uuid
+from test.helpers.utils import save_file_to_disk
class Qnand(unittest.TestCase):
params = None
@@ -11,29 +12,34 @@ class Qnand(unittest.TestCase):
def __init__(self, testname, testfunc, varlist):
self.params = varlist
super(Qnand, self).__init__(testfunc)
- if "device" in varlist:
- self.__device = varlist["device"]
- else:
- raise Exception('device param inside Qnand must be defined')
self._testMethodDoc = testname
+ self.__xmlObj = varlist["xml"]
+ self.__QNandName = varlist.get('name', 'qnand')
+ self.__device = varlist.get('device', self.__xmlObj.getKeyVal(self.__QNandName, "device", "/dev/mtd0"))
+ self.__nand_res_file = varlist.get('nand_res_file', self.__xmlObj.getKeyVal(self.__QNandName, "nand_res_file", "nand_test_{}.txt"))
+ self.__toPath = varlist.get('to', self.__xmlObj.getKeyVal(self.__QNandName, "to", "/mnt/station_ramdisk"))
+ self.__file_uuid = uuid.uuid4()
+ self.__nand_res_file = self.__nand_res_file.format(self.__file_uuid)
+ nandtestPath = self.__xmlObj.getKeyVal("qnand", "nandtestPath", '/root/hwtest-files/nandtest"')
+
+ if nandtestPath == '':
+ self.myNandTest = sh.nandtest
+ else:
+ self.myNandTest = sh.Command(nandtestPath)
self.__resultlist = []
def execute(self):
try:
- p = sh.nandtest("-m", self.__device)
- # save result
- with open('/mnt/station_ramdisk/nand-nandtest.txt', 'w') as outfile:
- n = outfile.write(p.stdout.decode('ascii'))
- outfile.close()
- self.__resultlist.append(
- {
- "description": "nandtest output",
- "filepath": "/mnt/station_ramdisk/nand-nandtest.txt",
- "mimetype": "text/plain"
- }
- )
- except sh.ErrorReturnCode as e:
- self.fail("failed: could not complete nandtest command")
+ res = self.myNandTest("-m", self.__device)
+ save_file_to_disk(filePath='{}/{}'.format(self.__toPath, self.__nand_res_file),
+ description='nand {} test'.format(self.__device),
+ mime='text/plain',
+ data=res.stdout.decode('utf-8'),
+ result=self.__resultlist)
+
+ except sh.ErrorReturnCode_1 as e:
+ self.fail("nandtest failed: {}".format(e.ErrorReturnCode.stdout))
+
def getresults(self):
return self.__resultlist
diff --git a/test-cli/test/tests/qplc.py b/test-cli/test/tests/qplc.py
new file mode 100644
index 0000000..01258a9
--- /dev/null
+++ b/test-cli/test/tests/qplc.py
@@ -0,0 +1,70 @@
+import shutil
+import unittest
+import os.path
+import os
+import sh
+
+from test.helpers.md5 import md5_file
+from test.helpers.plc import dcpPLC
+
+class Qplc(unittest.TestCase):
+ params = None
+ __resultlist = None # resultlist is a python list of python dictionaries
+ __QPLCName = None
+ __plc = None
+
+ def __init__(self, testname, testfunc, varlist):
+ self.params = varlist
+ super(Qplc, self).__init__(testfunc)
+ self._testMethodDoc = testname
+ self.__xmlObj = varlist["xml"]
+
+ self.__QPLCName = varlist.get('name', 'qplc')
+ self.__firmware = varlist.get('firmware', self.__xmlObj.getKeyVal(self.__QPLCName, "firmware", ""))
+ self.__firmware_md5 = varlist.get('firmware_md5', self.__xmlObj.getKeyVal(self.__QPLCName, "firmware_md5", ""))
+ self.__factoryTool = varlist.get('factory_tool', self.__xmlObj.getKeyVal(self.__QPLCName, "factory_tool", ""))
+ self.__gen_ip = varlist.get('gen_ip', self.__xmlObj.getKeyVal(self.__QPLCName, "gen_ip", "0"))
+ self.__gen_mac = varlist.get('gen_mac', self.__xmlObj.getKeyVal(self.__QPLCName, "gen_mac", "0"))
+ self.__mtd_device = varlist.get('mtd_device', self.__xmlObj.getKeyVal(self.__QPLCName, "mtd_device", "/dev/mtd0"))
+ self.__plc = dcpPLC(self.__factoryTool, self.__mtd_device)
+
+ self.__resultlist = []
+
+
+ def checkfiles(self):
+ if not os.path.isfile('{}/{}'.format(self.__filepath_from, self.__file_source)):
+ return False, 'File test binary Not found {}/{}'.format(self.__filepath_from, self.__file_source)
+ if not os.path.isfile('{}/{}'.format(self.__filepath_from, self.__file_md5)):
+ return False, 'File test binary md5 Not found {}/{}'.format(self.__filepath_from, self.__file_md5)
+ if self.__check_mounted != '':
+ if not os.path.ismount('{}'.format(self.__check_mounted)):
+ return False, 'Required mounted failed: {}'.format(self.__path_to)
+ r, s = self.removefileIfExist('{}/{}'.format(self.__path_to, self.__file_source))
+ if not r:
+ return r, s
+ return True, ''
+
+ def __flashMemory(self):
+ self.__plc.setSaveFirmwareMode()
+ r, v = self.__plc.SaveFirmware(self.__firmware)
+ if not r:
+ self.fail(v)
+
+ def execute(self):
+ # Check files
+ v, m = self.checkfiles()
+ if not v:
+ self.fail(m)
+ # do flash & verify
+ self.__flashMemory()
+ # do Plc boot
+ self.__plc.setBootMode()
+ # Wait plc goes UP
+
+
+
+ def getresults(self):
+ return self.__resultlist
+
+ def gettextresult(self):
+ return ""
diff --git a/test-cli/test/tests/qram.py b/test-cli/test/tests/qram.py
index 21a01c8..49d4d54 100644
--- a/test-cli/test/tests/qram.py
+++ b/test-cli/test/tests/qram.py
@@ -1,33 +1,59 @@
import unittest
import sh
-
+# from test.helpers.iseelogger import MeasureTime
+# from test.helpers.iseelogger import logObj
+from test.helpers.utils import save_file_to_disk
class Qram(unittest.TestCase):
params = None
- __memsize = "10M"
- __loops = "1"
- __resultlist = None # resultlist is a python list of python dictionaries
+ __memsize = None
+ __loops = None
+ __resultlist = [] # resultlist is a python list of python dictionaries
# varlist: memsize, loops
def __init__(self, testname, testfunc, varlist):
self.params = varlist
super(Qram, self).__init__(testfunc)
- if "memsize" in varlist:
- self.__memsize = varlist["memsize"]
- else:
- raise Exception('memsize param inside Qram must be defined')
- if "loops" in varlist:
- self.__loops = varlist["loops"]
- else:
- raise Exception('loops param inside Qram must be defined')
self._testMethodDoc = testname
- self.__resultlist = []
+ self.__xmlObj = varlist["xml"]
+
+ self.__QramName = varlist.get('name', 'qram')
+ self.__loops = varlist.get('loops', self.__xmlObj.getKeyVal(self.__QramName, "loops", "1"))
+ self.__memsize = varlist.get('memsize', self.__xmlObj.getKeyVal(self.__QramName, "memsize", "50M"))
+ self.__toPath = varlist.get('to', self.__xmlObj.getKeyVal(self.__QramName, "to", "/mnt/station_ramdisk"))
+ self.__ram_res_file = varlist.get('ram_res_file', self.__xmlObj.getKeyVal(self.__QramName, "ram_res_file", "ram_res.txt"))
+
+ self.__dummytest = varlist.get('dummytest', self.__xmlObj.getKeyVal(self.__QramName, "dummytest", "0"))
+ self.__dummyresult = varlist.get('dummytestresult', self.__xmlObj.getKeyVal(self.__QramName, "dummytest", "0"))
+
+ memtesterPath = self.__xmlObj.getKeyVal(self.__QramName, "memtesterPath", '')
+
+ if memtesterPath == '':
+ self.myMemtester = sh.memtester
+ else:
+ self.myMemtester = sh.Command(memtesterPath)
def execute(self):
+ self.__resultlist = []
try:
- p = sh.memtester(self.__memsize, "1", _out="/dev/null")
+ # mytime = MeasureTime()
+ res = self.myMemtester(self.__memsize, '{}'.format(self.__loops))
+ save_file_to_disk(filePath='{}/{}'.format(self.__toPath, self.__ram_res_file),
+ description='Ram result test',
+ mime='text/plain',
+ data=res.stdout.decode('utf-8'),
+ result=self.__resultlist)
+ # mytime.stop()
except sh.ErrorReturnCode as e:
- self.fail("failed: could not complete memtester command")
+ save_file_to_disk(filePath='{}/{}'.format(self.__toPath, self.__ram_res_file),
+ description='Ram result test',
+ mime='text/plain',
+ data=e.stdout.decode('utf-8'),
+ result=self.__resultlist)
+ self.fail("failed: memtester {}::{}".format(str(e.exit_code), e.stdout.decode('utf-8')))
+
+ except Exception as details:
+ self.fail('Error: {}'.format(details))
def getresults(self):
return self.__resultlist
diff --git a/test-cli/test/tests/qusb.py b/test-cli/test/tests/qusb.py
index 6302012..3182685 100644
--- a/test-cli/test/tests/qusb.py
+++ b/test-cli/test/tests/qusb.py
@@ -1,91 +1,90 @@
-import sh
+import shutil
import unittest
-from test.helpers.usb import USBDevices
import os.path
+import os
+import sh
+from test.helpers.md5 import md5_file
class Qusb(unittest.TestCase):
params = None
__resultlist = None # resultlist is a python list of python dictionaries
+ __QusbName = None
def __init__(self, testname, testfunc, varlist):
self.params = varlist
super(Qusb, self).__init__(testfunc)
self._testMethodDoc = testname
- if "repetitions" in varlist:
- self.__repetitions = varlist["repetitions"]
- else:
- raise Exception('repetitions param inside Qusb must be defined')
+ self.__xmlObj = varlist["xml"]
+
+ self.__QusbName = varlist.get('name', 'qusb')
+ self.__loops = varlist.get('loops', self.__xmlObj.getKeyVal(self.__QusbName, "loops", "3"))
+ self.__filepath_from = varlist.get('path', self.__xmlObj.getKeyVal(self.__QusbName, "path", "/root/hwtest-files"))
+ self.__file_source = varlist.get('file', self.__xmlObj.getKeyVal(self.__QusbName, "file", "usb-test.bin"))
+ self.__file_md5 = varlist.get('file_md5', self.__xmlObj.getKeyVal(self.__QusbName, "file_md5", "usb-test.bin.md5"))
+ self.__path_to = varlist.get('pathto', self.__xmlObj.getKeyVal(self.__QusbName, "pathto", "/mnt/test/disk2"))
+ self.__check_mounted = varlist.get('check_mounted', self.__xmlObj.getKeyVal(self.__QusbName, "check_mounted", ""))
+
self.__resultlist = []
- def execute(self):
- # get usb device
- dev_obj = USBDevices(self.params["xml"])
- if dev_obj.getMassStorage():
- device = dev_obj.getMassStorage()['disk'] + "1"
- else:
- self.fail("failed: No USB memory found.")
- # check if the device is mounted, and umount it
- try:
- p = sh.findmnt("-n", device)
- if p.exit_code == 0:
- sh.umount(device)
- except sh.ErrorReturnCode as e:
- # error = 1 means "no found"
- pass
- # mount the device
- sh.mkdir("-p", "/mnt/station_ramdisk/pendrive")
- p = sh.mount(device, "/mnt/station_ramdisk/pendrive")
- if p.exit_code != 0:
- self.fail("failed: Unable to mount the USB memory device.")
- # execute test
- for i in range(int(self.__repetitions)):
- # copy files
+ def removefileIfExist(self, fname):
+ if os.path.exists(fname):
try:
- p = sh.cp("/var/lib/hwtest-files/usbdatatest.bin",
- "/var/lib/hwtest-files/usbdatatest.bin.md5",
- "/mnt/station_ramdisk/pendrive")
- except sh.ErrorReturnCode as e:
- try:
- sh.umount("/mnt/station_ramdisk/pendrive")
- except sh.ErrorReturnCode:
- pass
- sh.rmdir("/mnt/station_ramdisk/pendrive")
- self.fail("failed: Unable to copy files to the USB memory device.")
- # check if the device is still mounted
- if not os.path.ismount("/mnt/station_ramdisk/pendrive"):
- sh.rm("/mnt/station_ramdisk/pendrive/*")
- sh.rmdir("/mnt/station_ramdisk/pendrive")
- self.fail("failed: USB device unmounted during/after copying files.")
- # check MD5
- try:
- p = sh.md5sum("/mnt/station_ramdisk/pendrive/usbdatatest.bin")
- except sh.ErrorReturnCode as e:
- try:
- sh.umount("/mnt/station_ramdisk/pendrive")
- except sh.ErrorReturnCode:
- pass
- sh.rmdir("/mnt/station_ramdisk/pendrive")
- self.fail("failed: Unable to calculate MD5 of the copied file.")
- newmd5 = p.stdout.decode().split(" ")[0]
- with open('/mnt/station_ramdisk/pendrive/usbdatatest.bin.md5', 'r') as outfile:
- oldmd5 = outfile.read().rstrip("\n")
- outfile.close()
- if newmd5 != oldmd5:
- try:
- sh.umount("/mnt/station_ramdisk/pendrive")
- except sh.ErrorReturnCode:
- pass
- sh.rmdir("/mnt/station_ramdisk/pendrive")
- self.fail("failed: MD5 check failed.")
- # delete copied files
- sh.rm("-f", "/mnt/station_ramdisk/pendrive/usbdatatest.bin", "/mnt/station_ramdisk/pendrive/usbdatatest.bin.md5")
- # Finish
+ os.remove(fname)
+ except OSError as error:
+ return False, str(error)
+ return True, ''
+
+ def checkfiles(self):
+ if not os.path.isfile('{}/{}'.format(self.__filepath_from, self.__file_source)):
+ return False, 'File test binary Not found {}/{}'.format(self.__filepath_from, self.__file_source)
+ if not os.path.isfile('{}/{}'.format(self.__filepath_from, self.__file_md5)):
+ return False, 'File test binary md5 Not found {}/{}'.format(self.__filepath_from, self.__file_md5)
+ if self.__check_mounted != '':
+ if not os.path.ismount('{}'.format(self.__check_mounted)):
+ return False, 'Required mounted failed: {}'.format(self.__path_to)
+ r, s = self.removefileIfExist('{}/{}'.format(self.__path_to, self.__file_source))
+ if not r:
+ return r, s
+ return True, ''
+
+ def getTestFile_md5(self, fname):
+ t = ''
+ with open('{}'.format(fname), 'r') as outfile:
+ t = outfile.read().rstrip("\n")
+ outfile.close()
+ return t
+
+ def ExecuteTranfer(self):
try:
- sh.umount("/mnt/station_ramdisk/pendrive")
- except sh.ErrorReturnCode:
- pass
- sh.rmdir("/mnt/station_ramdisk/pendrive")
+ originalMD5 = self.getTestFile_md5('{}/{}'.format(self.__filepath_from, self.__file_md5))
+ if originalMD5 == '':
+ self.fail('MD5 file {}/{} Not contain any md5'.format(self.__filepath_from, self.__file_md5))
+ # shutil.copy2('{}/{}'.format(self.__filepath_from, self.__file_source), '{}'.format(self.__path_to))
+ sh.cp('{}/{}'.format(self.__filepath_from, self.__file_source), '{}'.format(self.__path_to))
+ sh.sync()
+ md5val = md5_file('{}/{}'.format(self.__path_to, self.__file_source))
+ self.removefileIfExist('{}/{}'.format(self.__path_to, self.__file_source))
+ if originalMD5 != md5val:
+ return False, 'md5 check failed {}<>{}'.format(originalMD5, md5val)
+ except OSError as why:
+ return False,'Error: {} tranfer failed'.format(why.errno)
+ except Exception as details:
+ return False, 'USB Exception: {} tranfer failed'.format(details)
+ return True, ''
+
+ def execute(self):
+ v, m = self.checkfiles()
+ if not v:
+ self.fail(m)
+ countLoops = int(self.__loops)
+ while countLoops > 0:
+ res, msg = self.ExecuteTranfer()
+ if not res:
+ res, msg = self.ExecuteTranfer()
+ if not res:
+ self.fail(msg)
+ countLoops = countLoops - 1
def getresults(self):
return self.__resultlist
diff --git a/test-cli/test/tests/qusbdual.py b/test-cli/test/tests/qusbdual.py
deleted file mode 100644
index 05b22c3..0000000
--- a/test-cli/test/tests/qusbdual.py
+++ /dev/null
@@ -1,90 +0,0 @@
-import sh
-import unittest
-import os.path
-import time
-
-
-class Qusbdual(unittest.TestCase):
- params = None
- __resultlist = None # resultlist is a python list of python dictionaries
-
- def __init__(self, testname, testfunc, varlist):
- self.params = varlist
- super(Qusbdual, self).__init__(testfunc)
- self._testMethodDoc = testname
- if "repetitions" in varlist:
- self.__repetitions = varlist["repetitions"]
- else:
- raise Exception('repetitions param inside Qusbdual must be defined')
- self.__resultlist = []
-
- def execute(self):
- # check if file-as-filesystem exists
- if not os.path.isfile('/var/lib/hwtest-files/mass-otg-test.img'):
- self.fail("failed: Unable to find file-as-filesystem image.")
- # generate mass storage gadget
- p = sh.modprobe("g_mass_storage", "file=/var/lib/hwtest-files/mass-otg-test.img")
- if p.exit_code != 0:
- self.fail("failed: Unable to create a mass storage gadget.")
- # wait to detect the new device
- time.sleep(3)
- # find the mass storage device
- try:
- p = sh.grep(sh.lsblk("-So", "NAME,VENDOR"), "Linux")
- except sh.ErrorReturnCode:
- sh.modprobe("-r", "g_mass_storage")
- self.fail("failed: could not find any mass storage gadget")
- device = p.stdout.decode().split(" ")[0]
- # mount the mass storage gadget
- sh.mkdir("-p", "/mnt/station_ramdisk/hdd_gadget")
- p = sh.mount("-o", "ro", "/dev/" + device, "/mnt/station_ramdisk/hdd_gadget")
- if p.exit_code != 0:
- sh.modprobe("-r", "g_mass_storage")
- self.fail("failed: Unable to mount the mass storage gadget.")
- # execute test
- for i in range(int(self.__repetitions)):
- # copy files
- try:
- p = sh.cp("/mnt/station_ramdisk/hdd_gadget/usb-test.bin",
- "/mnt/station_ramdisk/hdd_gadget/usb-test.bin.md5",
- "/mnt/station_nfsdisk/")
- except sh.ErrorReturnCode as e:
- sh.umount("/mnt/station_ramdisk/hdd_gadget")
- sh.rmdir("/mnt/station_ramdisk/hdd_gadget")
- sh.modprobe("-r", "g_mass_storage")
- self.fail("failed: Unable to copy files through USB.")
- # check if the device is still mounted
- if not os.path.ismount("/mnt/station_ramdisk/hdd_gadget"):
- sh.rm("/mnt/station_ramdisk/hdd_gadget/*")
- sh.rmdir("/mnt/station_ramdisk/hdd_gadget")
- sh.modprobe("-r", "g_mass_storage")
- self.fail("failed: USB device unmounted during/after copying files.")
- # Check md5
- try:
- p = sh.md5sum("/mnt/station_nfsdisk/usb-test.bin")
- except sh.ErrorReturnCode as e:
- sh.umount("/mnt/station_ramdisk/hdd_gadget")
- sh.rmdir("/mnt/station_ramdisk/hdd_gadget")
- sh.modprobe("-r", "g_mass_storage")
- self.fail("failed: Unable to calculate MD5 of the copied file.")
- newmd5 = p.stdout.decode().split(" ")[0]
- with open('/mnt/station_ramdisk/hdd_gadget/usb-test.bin.md5', 'r') as outfile:
- oldmd5 = outfile.read().rstrip("\n")
- outfile.close()
- if newmd5 != oldmd5:
- sh.umount("/mnt/station_ramdisk/hdd_gadget")
- sh.rmdir("/mnt/station_ramdisk/hdd_gadget")
- sh.modprobe("-r", "g_mass_storage")
- self.fail("failed: MD5 check failed.")
- # delete copied files
- sh.rm("-f", "/mnt/station_nfsdisk/usb-test.bin", "/mnt/station_nfsdisk/usb-test.bin.md5")
- # Finish
- sh.umount("/mnt/station_ramdisk/hdd_gadget")
- sh.rmdir("/mnt/station_ramdisk/hdd_gadget")
- sh.modprobe("-r", "g_mass_storage")
-
- def getresults(self):
- return self.__resultlist
-
- def gettextresult(self):
- return ""
diff --git a/test-cli/test/tests/qwifi.py b/test-cli/test/tests/qwifi.py
index 3dd9e38..4695041 100644
--- a/test-cli/test/tests/qwifi.py
+++ b/test-cli/test/tests/qwifi.py
@@ -1,9 +1,11 @@
import unittest
-import sh
-import re
-import json
import time
-
+import uuid
+import iperf3
+import netifaces
+from test.helpers.iw import iwScan
+from test.helpers.utils import save_json_to_disk
+from test.helpers.utils import station2Port
class Qwifi(unittest.TestCase):
__serverip = None
@@ -19,81 +21,76 @@ class Qwifi(unittest.TestCase):
def __init__(self, testname, testfunc, varlist):
self.params = varlist
super(Qwifi, self).__init__(testfunc)
- if "serverip" in varlist:
- self.__serverip = varlist["serverip"]
- else:
- raise Exception('serverip param inside Qwifi have been be defined')
- if "bwexpected" in varlist:
- self.__bwexpected = varlist["bwexpected"]
- else:
- raise Exception('OKBW param inside Qwifi must be defined')
- if "port" in varlist:
- self.__port = varlist["port"]
- else:
- raise Exception('port param inside Qwifi must be defined')
- self.__numbytestx = "10M"
self._testMethodDoc = testname
+ self.__xmlObj = varlist["xml"]
+ self.__QwifiName = varlist.get('name', 'qwifi')
+ self.__loops = varlist.get('loops', self.__xmlObj.getKeyVal(self.__QwifiName, "loops", "1"))
+ self.__port = varlist.get('port', self.__xmlObj.getKeyVal(self.__QwifiName, "port", "5000"))
+ self.__bw = varlist.get('bwexpected', self.__xmlObj.getKeyVal(self.__QwifiName, "bwexpected", "5.0"))
+ self.__serverip = varlist.get('serverip', self.__xmlObj.getKeyVal(self.__QwifiName, "serverip", "localhost"))
+ self.__duration = varlist.get('duration', self.__xmlObj.getKeyVal(self.__QwifiName, "duration", "10"))
+ self.__interface = varlist.get('interface', self.__xmlObj.getKeyVal(self.__QwifiName, "interface", "wlan0"))
+ self.__toPath = varlist.get('to', self.__xmlObj.getKeyVal(self.__QwifiName, "to", "/mnt/station_ramdisk"))
+ self.__retriesCount = varlist.get('retries', self.__xmlObj.getKeyVal(self.__QwifiName, "retries", "5"))
+ self.__waitRetryTime = varlist.get('wait_retry', self.__xmlObj.getKeyVal(self.__QwifiName, "wait_retry", "10"))
+ self.__wifi_res_file = varlist.get('wifi_res_file', self.__xmlObj.getKeyVal(self.__QwifiName, "wifi_res_file", "wifi_test_{}.json"))
+ self.__wifi_st_file = varlist.get('wifi_st_file', self.__xmlObj.getKeyVal(self.__QwifiName, "wifi_st_file", "wifi_st_{}.json"))
+ self.__file_uuid = uuid.uuid4()
+ self.__wifi_res_file = self.__wifi_res_file.format(self.__file_uuid)
+ self.__wifi_st_file = self.__wifi_st_file.format(self.__file_uuid)
self.__resultlist = []
- def execute(self):
- # check if the board is connected to the router by wifi
- p = sh.iw("wlan0", "link")
- if p.exit_code == 0:
- # get the first line of the output stream
- out1 = p.stdout.decode('ascii').splitlines()[0]
- if out1 != "Not connected.":
- # check if the board has ip in the wlan0 interface
- p = sh.ifconfig("wlan0")
- if p.exit_code == 0:
- # check if wlan0 has an IP
- result = re.search(
- 'inet addr:(?!127\.0{1,3}\.0{1,3}\.0{0,2}1$)((25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.){3}(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)',
- p.stdout.decode('ascii'))
- if result:
- # execute iperf command against the server, but it implements attempts in case the server is busy
- iperfdone = False
- while not iperfdone:
- try:
- p = sh.iperf3("-c", self.__serverip, "-n", self.__numbytestx, "-f", "m", "-p", self.__port,
- "-J", _timeout=20)
- iperfdone = True
- except sh.TimeoutException:
- self.fail("failed: iperf timeout reached")
- except sh.ErrorReturnCode:
- time.sleep(self.timebetweenattempts)
-
- # check if it was executed succesfully
- if p.exit_code == 0:
- if p.stdout == "":
- self.fail("failed: error executing iperf command")
- # analyze output string
- data = json.loads(p.stdout.decode('ascii'))
- self.__bwreal = float(data['end']['sum_received']['bits_per_second']) / 1024 / 1024
- # save result file
- with open('/mnt/station_ramdisk/wifi-iperf3.json', 'w') as outfile:
- json.dump(data, outfile, indent=4)
- outfile.close()
- self.__resultlist.append(
- {
- "description": "iperf3 output",
- "filepath": "/mnt/station_ramdisk/wifi-iperf3.json",
- "mimetype": "application/json"
- }
- )
+ def checkInterface(self, reqInterface):
+ ifaces = netifaces.interfaces()
+ for t in ifaces:
+ if t == reqInterface:
+ return True
+ return False
- # check if BW is in the expected range
- if self.__bwreal < float(self.__bwexpected):
- self.fail("failed: speed is lower than expected. Speed(Mbits/s): " + str(self.__bwreal))
- else:
- self.fail("failed: could not complete iperf3 command")
- else:
- self.fail("failed: wlan0 interface doesn't have any ip address.")
+ def execute(self):
+ self.__resultlist = []
+ stationList = {}
+ # check interfaces
+ if not self.checkInterface(self.__interface):
+ self.fail('Interface not found: {}'.format(self.__interface))
+ # scan networks
+ scanner = iwScan(self.__interface)
+ if scanner.scan():
+ stationList = scanner.getStations()
+ # check if we are connected
+ if not scanner.findConnected():
+ self.fail('Not connected to test AP')
+ else:
+ strError = scanner.getLastError()
+ self.fail('qWifi scanner Execution error: {}'.format(strError))
+ # Start Iperf
+ client = iperf3.Client()
+ client.duration = int(self.__duration)
+ client.server_hostname = self.__serverip
+ client.port = station2Port(self.__port)
+ count = int(self.__retriesCount)
+ while count > 0:
+ result = client.run()
+ if result.error == None:
+ if result.received_Mbps >= float(self.__bw) and result.sent_Mbps >= float(self.__bw):
+ save_json_to_disk(filePath='{}/{}'.format(self.__toPath, self.__wifi_st_file),
+ description='wifi scan',
+ mime='application/json',
+ json_data=stationList,
+ result=self.__resultlist)
+ save_json_to_disk(filePath='{}/{}'.format(self.__toPath, self.__wifi_res_file),
+ description='wifi {} iperf3'.format(self.__interface),
+ mime='application/json',
+ json_data=result.json,
+ result=self.__resultlist)
+ return
else:
- self.fail("failed: could not complete ifconfig command.")
+ self.fail('bw fail: rx:{} tx:{}'.format(result.received_Mbps, result.sent_Mbps))
else:
- self.fail("failed: wifi module is not connected to the router.")
- else:
- self.fail("failed: could not execute iw command")
+ count = count - 1
+ time.sleep(int(self.__waitRetryTime))
+
+ self.fail('qWifi (max tries) Execution error: {}'.format(result.error))
def getresults(self):
return self.__resultlist
diff --git a/test-cli/test_main.py b/test-cli/test_main.py
index 942df61..3f52152 100644
--- a/test-cli/test_main.py
+++ b/test-cli/test_main.py
@@ -1,8 +1,17 @@
+import time
+import sh
+import os
+import unittest
+import socket
+import logging
+import sys
+import getopt
+import random
+
+from datetime import datetime
from test.helpers.int_registers import get_die_id
from test.helpers.int_registers import get_internal_mac
from subprocess import call
-import os
-import unittest
from test.helpers.testsrv_db import TestSrv_Database
from test.helpers.setup_xml import XMLSetup
from test.runners.simple import SimpleTestRunner
@@ -11,7 +20,6 @@ from test.tests.qram import Qram
from test.tests.qusb import Qusb
from test.tests.qeeprom import Qeeprom
from test.tests.qserial import Qserial
-from test.tests.qusbdual import Qusbdual
from test.tests.qwifi import Qwifi
from test.tests.qrtc import Qrtc
from test.tests.qduplex_ser import Qduplex
@@ -19,25 +27,28 @@ from test.tests.qamper import Qamper
from test.tests.qnand import Qnand
from test.tests.qaudio import Qaudio
from test.tests.qdmesg import Qdmesg
+from test.tests.qmmcflash import Qmmcflash
+from test.tests.qplc import Qplc
from test.helpers.globalVariables import globalVar
-import socket
-from test.helpers.iseelogger import ISEE_Logger
-import logging
-from test.tasks.flasheeprom import flash_eeprom
-from test.tasks.flashmemory import flash_memory
+from test.tasks.flasheeprom import EEprom_Flasher
+from test.tasks.flashmemory import NAND_Flasher
from test.helpers.qrreader import QRReader
-from test.helpers.cmdline import LinuxKernel
+from test.helpers.cmdline import LinuxKernelCmd
from test.helpers.amper import Amper
from test.enums.StationStates import StationStates
-import time
-import sh
+from test.helpers.mydebugger import sentdebugmessage
+import xml.etree.ElementTree as XMLParser
+from test.helpers.iseelogger import logObj
+from test.helpers.utils import str2bool
# global variables
psdbObj = TestSrv_Database()
xmlObj = None
-loggerObj = None
test_abspath = None
tests_manually_executed = []
+AutoTest = False
+OverrideAutoReboot = False
+delay_reboot = '0'
# define clear function
@@ -45,74 +56,62 @@ def clear():
# check and make call for specific operating system
_ = call('clear' if os.name == 'posix' else 'cls')
+def reboot_board():
+ logObj.getlogger().debug("#reboot board#")
+ if not OverrideAutoReboot:
+ set_next_state(StationStates.STATION_WAIT_SHUTDOWN.name)
+ if delay_reboot == '0':
+ time.sleep(5)
+ sh.shutdown('-r', '+{}'.format(delay_reboot), _bg=True)
+ # sh.reboot(_bg=True)
+ print("REBOOT...")
+ exit(0)
-def execute_station_error(text):
- print("Error: {}".format(text))
- loggerObj.getlogger().info("Station error: #{}#".format(text))
+def wait_for_reboot():
while True:
currentstate = psdbObj.read_station_state(globalVar.station)
if currentstate == StationStates.STATION_REBOOT.name:
reboot_board()
- time.sleep(1)
+ time.sleep(5)
+def execute_station_error(text):
+ logObj.getlogger().error("Station error: #{}#".format(text))
+ wait_for_reboot()
def check_first_state():
currentstate = psdbObj.read_station_state(globalVar.station)
- if currentstate == StationStates.STATION_ERROR.name:
- while True:
- currentstate = psdbObj.read_station_state(globalVar.station)
- if currentstate == StationStates.STATION_REBOOT.name:
- reboot_board()
- time.sleep(1)
- elif currentstate == StationStates.STATION_REBOOT.name:
- reboot_board()
+ if currentstate == StationStates.STATION_ERROR.name or currentstate == StationStates.STATION_REBOOT.name:
+ logObj.getlogger().debug('station state during the first check is {}'.format(currentstate))
+ wait_for_reboot()
else:
- starttime = time.time()
+ count_t = int(xmlObj.getKeyVal("test_main", "wait_test_start", "240"))
while currentstate != StationStates.WAIT_TEST_START.name:
- if time.time() - starttime >= 30.0:
+ logObj.getlogger().debug('wait for WAIT_TEST_START but read: {}'.format(currentstate))
+ if count_t <= 0:
# Error
- execute_station_error("Timeout while waiting for WAIT_TEST_START state")
+ execute_station_error("Timeout while waiting for WAIT_TEST_START state actual STATE = {}".format(currentstate))
else:
+ count_t = count_t - 5
time.sleep(5)
currentstate = psdbObj.read_station_state(globalVar.station)
- if currentstate == StationStates.STATION_ERROR.name:
- while True:
- currentstate = psdbObj.read_station_state(globalVar.station)
- if currentstate == StationStates.STATION_REBOOT.name:
- reboot_board()
- time.sleep(1)
- elif currentstate == StationStates.STATION_REBOOT.name:
- reboot_board()
-
-
-def reboot_board():
- sh.reboot()
- exit(0)
+ if currentstate == StationStates.STATION_ERROR.name or currentstate == StationStates.STATION_REBOOT.name:
+ wait_for_reboot()
def set_next_state(newstate):
statewritten = psdbObj.change_station_state(globalVar.station, newstate)
- if statewritten == StationStates.STATION_REBOOT.name:
- reboot_board()
- elif statewritten == StationStates.STATION_ERROR.name:
- while True:
- currentstate = psdbObj.read_station_state(globalVar.station)
- if currentstate == StationStates.STATION_REBOOT.name:
- reboot_board()
- time.sleep(1)
-
+ if statewritten == StationStates.STATION_REBOOT.name or statewritten == StationStates.STATION_ERROR.name:
+ wait_for_reboot()
def reboot_if_autotest():
# reset board if AUTOTEST is enabled
- autotest = psdbObj.get_setup_variable("AUTOTEST_" + globalVar.station)
- if int(autotest) == 1:
+ if AutoTest:
reboot_board()
def create_paramslist(params):
paramlist = {}
- for row in params:
- varname, varvalue = row
+ for varname, varvalue in params:
paramlist[varname] = varvalue
return paramlist
@@ -151,13 +150,17 @@ def add_test(suite, testdefname, paramlist):
elif testdefname == "AUDIO":
suite.addTest(Qaudio(testdefname, "execute", paramlist))
return 0
- elif testdefname == "USBLOOP":
- suite.addTest(Qusbdual(testdefname, "execute", paramlist))
- return 0
elif testdefname == "DMESG":
tests_manually_executed.append(Qdmesg(testdefname, "execute", paramlist))
return 0
+ elif testdefname == "MMCFLASH":
+ suite.addTest(Qmmcflash(testdefname, "execute", paramlist))
+ return 0
+ elif testdefname == "PLC_MODEM":
+ suite.addTest(Qplc(testdefname, "execute", paramlist))
+ return 0
else:
+ logObj.getlogger().error("Params for test: {} not implemented (ignore it), ".format(testdefname))
return 1
@@ -166,9 +169,9 @@ def create_testsuite():
suite1 = unittest.TestSuite()
# get list of tests for this board
tests = psdbObj.get_tests_list(globalVar.g_uuid)
+ logObj.getlogger().debug("Create TestSuite: {}".format(tests))
# loop in every test for this board
- for row in tests:
- testid, testdefname = row
+ for testid, testdefname in tests:
# get params for this test
params = psdbObj.get_test_params_list(testid)
paramlist = create_paramslist(params)
@@ -176,6 +179,8 @@ def create_testsuite():
paramlist["testid"] = testid
paramlist["boarduuid"] = globalVar.g_uuid
paramlist["testidctl"] = globalVar.testid_ctl
+ paramlist['testPath'] = test_abspath
+ logObj.getlogger().debug("Params for test: {}:{}-{}, ".format(testid, testdefname, paramlist))
paramlist["xml"] = xmlObj
paramlist["db"] = psdbObj
# add test to TestSuite
@@ -185,44 +190,123 @@ def create_testsuite():
def create_board():
- cmd = LinuxKernel()
+
+ cmd = LinuxKernelCmd()
model_id = cmd.getkvar("bmodel", "none")
variant = cmd.getkvar("bvariant", "none")
+ logObj.getlogger().debug("Kernel Command line vars: bmodel: {} bvariant: {}".format(model_id, variant))
if model_id == "none" or variant == "none":
# Error
- execute_station_error("Cannot get model and variant information")
+ execute_station_error("Cannot get model {} and variant {} information".format(model_id, variant))
# get model id
globalVar.g_mid = model_id + "-" + variant
+ logObj.getlogger().debug("Model-Variant -> {}".format(globalVar.g_mid))
# get station
if "Station" not in globalVar.station:
# Error
- execute_station_error("Wrong station name")
+ execute_station_error("Hostname not defined")
processor_id = get_die_id(globalVar.g_mid)
- print(globalVar.g_mid)
- print(processor_id)
globalVar.g_uuid = psdbObj.create_board(processor_id, model_id, variant, globalVar.station,
get_internal_mac(globalVar.g_mid))
- print(globalVar.g_uuid)
+
+ logObj.getlogger().info("processor ID: {} uuid: {}".format(processor_id, globalVar.g_uuid))
def get_taskvars_list(uuid):
varlist = {}
- for row in psdbObj.get_task_variables_list(uuid):
- varname, varvalue = row
+ for varname, varvalue in psdbObj.get_task_variables_list(uuid):
varlist[varname] = varvalue
return varlist
+def execute_extra_task():
+ # list of task
+ myTask = []
+ # Change state to "EXTRATASKS_RUNNING"
+ set_next_state(StationStates.EXTRATASKS_RUNNING.name)
+ # create task control
+ globalVar.taskid_ctl = psdbObj.open_task(globalVar.g_uuid)
+ # get extra variables
+ varlist = get_taskvars_list(globalVar.g_uuid)
+
+ # Put default Vars
+ varlist["boarduuid"] = globalVar.g_uuid
+ varlist["testidctl"] = globalVar.testid_ctl
+ varlist['testPath'] = test_abspath
+ varlist["xml"] = xmlObj
+ varlist["db"] = psdbObj
+
+ # Get list Extra Task and activation
+ eeprom = varlist.get('eeprom', '0')
+ flashNand = varlist.get('flashNAND', '0')
+
+ if eeprom == '1':
+ myEEprom = EEprom_Flasher(varlist)
+ myTask.append(myEEprom)
+ if flashNand == '1':
+ myNAND = NAND_Flasher(varlist)
+ myTask.append(myNAND)
+
+ taskresult = True
+ for task in myTask:
+ res, data = task.Execute()
+ if res:
+ psdbObj.create_task_result(globalVar.taskid_ctl, task.getTaskName(), "TASK_OK", data)
+ logObj.getlogger().info("task: {} ok".format(task.getTaskName()))
+ else:
+ psdbObj.create_task_result(globalVar.taskid_ctl, task.getTaskName(), "TASK_FAIL", data)
+ logObj.getlogger().info("task: {} failed".format(task.getTaskName()))
+ taskresult = False
+
+ # check final taskresult => ok or fail
+ if taskresult:
+ psdbObj.update_taskctl_status(globalVar.taskid_ctl, "TASK_BOARD_OK")
+ else:
+ psdbObj.update_taskctl_status(globalVar.taskid_ctl, "TASK_BOARD_FAIL")
+ for task in myTask:
+ if task.getError() != '':
+ execute_station_error("Unable to complete extra tasks: {}".format(task.getError()))
+ # Not emit a error message
+ execute_station_error("Unable to complete extra tasks")
+
+def getBarCode(fake):
+
+ set_next_state(StationStates.WAITING_FOR_SCANNER.name)
+ if fake:
+ psdbObj.set_factorycode(globalVar.g_uuid, str(random.randint(1, 1000001)))
+ return
+ qrreceived = False
+ wait_count = psdbObj.get_setup_variable("QRWAIT_SEC", xmlObj.getKeyVal("test_main", "qr_wait_time", "120"))
+ while not qrreceived or int(wait_count) > 0:
+ qr = QRReader()
+ if qr.openQR():
+ # waits 5s to receive a valid code
+ if qr.readQRasync(10):
+ qrreceived = True
+ factorycode = qr.getQRNumber()
+ psdbObj.set_factorycode(globalVar.g_uuid, factorycode)
+ return
+ qr.closeQR()
+ else:
+ qrreceived = True
+ del qr
+ time.sleep(1)
+ wait_count = wait_count - 1
+ if int(wait_count) == 0:
+ execute_station_error("Cannot find a barcode scanner")
+
-def main():
+def execute_test ():
# initialize the board
create_board()
# create a process
globalVar.testid_ctl = psdbObj.open_test(globalVar.g_uuid)
+ logObj.getlogger().info("Create new Test: {}".format(globalVar.testid_ctl))
+
# Change state to "TESTS_RUNNING"
set_next_state(StationStates.TESTS_RUNNING.name)
@@ -230,71 +314,29 @@ def main():
suite1 = create_testsuite()
# Execute tests (round 1)
runner1 = SimpleTestRunner(psdbObj)
+ logObj.getlogger().info("Runner run id: {}".format(globalVar.testid_ctl))
testresult = runner1.run(suite1)
+ logObj.getlogger().info("Runner run id: {} finish {}".format(globalVar.testid_ctl, testresult))
# Execute manually tests
for test in tests_manually_executed:
- test.execute()
+ logObj.getlogger().info("Execute manual test id: {}:{}".format(globalVar.testid_ctl, test.getTestName()))
+ mtestResult = test.execute()
+ logObj.getlogger().info("Execute manual test id: {}:{} Result: {}".format(globalVar.testid_ctl, test.getTestName(),
+ 'ok' if mtestResult else 'fail'))
# execute aditional tasks, only if the test was successful
if testresult.wasSuccessful():
- # Change state to "TESTS_OK"
+ logObj.getlogger().info("Execute test id: {} finished succesfully".format(globalVar.testid_ctl))
set_next_state(StationStates.TESTS_OK.name)
- # Change state to "EXTRATASKS_RUNNING"
- set_next_state(StationStates.EXTRATASKS_RUNNING.name)
- # create task control
- globalVar.taskid_ctl = psdbObj.open_task(globalVar.g_uuid)
- # get extra variables
- varlist = get_taskvars_list(globalVar.g_uuid)
-
- alltasksok = False
-
- # flash eeprom
- resulteeprom = 0
- if "eeprompath" in varlist and len(varlist["eeprompath"]) > 0:
- mac0 = psdbObj.get_board_macaddr(globalVar.g_uuid)
- resulteeprom, eepromdata = flash_eeprom(varlist["eeprompath"], globalVar.g_uuid, mac0)
- psdbObj.create_task_result(globalVar.taskid_ctl, "FLASHEEPROM",
- "TASK_OK" if resulteeprom == 0 else "TASK_FAIL", eepromdata)
-
- # flash non-volatile memory
- resultmemory = 0
- if "flashimagepath" in varlist and len(varlist["flashimagepath"]) > 0:
- resultmemory = flash_memory(varlist["flashimagepath"])
- psdbObj.create_task_result(globalVar.taskid_ctl, "FLASHMEMORY",
- "TASK_OK" if resultmemory == 0 else "TASK_FAIL",
- varlist["flashimagepath"])
-
- # update status with the result
- if resulteeprom == 0 and resultmemory == 0:
- # finish tasks
- psdbObj.update_taskctl_status(globalVar.taskid_ctl, "TASK_BOARD_OK")
-
- # Change state to "WAITING_FOR_SCANNER"
- set_next_state(StationStates.WAITING_FOR_SCANNER.name)
-
- # get barcode using the scanner, only if autotest is disabled
- autotest = psdbObj.get_setup_variable("AUTOTEST_" + globalVar.station)
- if int(autotest) == 1:
- psdbObj.set_factorycode(globalVar.g_uuid, "1234567890") # fake factory code
- else:
- qrreceived = False
- while not qrreceived:
- qr = QRReader()
- if qr.openQR():
- # waits 5s to receive a valid code
- if qr.readQRasync(5):
- qrreceived = True
- factorycode = qr.getQRNumber()
- psdbObj.set_factorycode(globalVar.g_uuid, factorycode)
- qr.closeQR()
-
- # Change state to "FINISHED"
- set_next_state(StationStates.FINISHED.name)
- else:
- psdbObj.update_taskctl_status(globalVar.taskid_ctl, "TASK_BOARD_FAIL")
- loggerObj.getlogger().info("Station error: #Unable to complete extra tasks#")
+ logObj.getlogger().info("Execute test id: {} extra task".format(globalVar.testid_ctl))
+ execute_extra_task()
+ logObj.getlogger().info("Execute test id: {} wait read scanner".format(globalVar.testid_ctl))
+ getBarCode(AutoTest)
+ set_next_state(StationStates.FINISHED.name)
+ logObj.getlogger().info("Execute test id: {} wait read scanner finish".format(globalVar.testid_ctl))
else:
+ logObj.getlogger().info("Execute test id: {} finished with one or more fails".format(globalVar.testid_ctl))
# Change state to "TESTS_FAILED"
set_next_state(StationStates.TESTS_FAILED.name)
@@ -302,28 +344,75 @@ def main():
reboot_if_autotest()
+def enableFaulthandler():
+ """ Enable faulthandler for all threads.
+
+ If the faulthandler package is available, this function disables and then
+ re-enables fault handling for all threads (this is necessary to ensure any
+ new threads are handled correctly), and returns True.
+
+ If faulthandler is not available, then returns False.
+ """
+ try:
+ import faulthandler
+ # necessary to disable first or else new threads may not be handled.
+ faulthandler.disable()
+ faulthandler.enable(all_threads=True)
+ return True
+ except ImportError:
+ return False
+
if __name__ == "__main__":
# Clear the shell screen
clear()
- test_abspath = os.path.dirname(os.path.abspath(__file__))
- # create logger
- loggerObj = ISEE_Logger(logging.INFO)
+ enableFaulthandler()
+
+ test_abspath = os.path.dirname(os.path.abspath(__file__))
+ configFile = 'setup.xml'
+ loglevel = 'INFO'
+ logObj.getlogger().info("Test program Started : {}".format(datetime.now()))
+ try:
+ argv = sys.argv[1:]
+ opts, args = getopt.getopt(argv, 'c:l:', ['cfg', 'loglevel'])
+ for opt, args in opts:
+ if opt in ('-c', '--setup'):
+ configFile = args
+ if opt in ('-l', '--loglevel'):
+ loglevel = args
+ except getopt.GetoptError as err:
+ logObj.getlogger().error('#{}#'.format(err))
+
+ if loglevel != 'INFO':
+ logObj.setLogLevel(loglevel)
# Try to parse the setup.xml file
+ logObj.getlogger().debug("parse config file: {}". format(configFile))
try:
- xmlObj = XMLSetup(os.path.join(test_abspath, "setup.xml"))
- except:
+ xmlObj = XMLSetup(os.path.join(test_abspath, configFile))
+ except XMLParser.ParseError as error:
# Error
- execute_station_error("Cannot parse setup.xml file")
+ execute_station_error("Error parsing {} file {}".format(configFile, error.msg))
# Try to connect to the DB, according to setup.xml configuration
if not psdbObj.open(xmlObj):
# Error
execute_station_error("Cannot open DB connection")
+ else:
+ logObj.getlogger().debug("database connection failed: {}".format(psdbObj.getConfig()))
# get station name
globalVar.station = socket.gethostname()
+ logObj.getlogger().info("Test Station: {}".format(globalVar.station))
+
+ if psdbObj.get_setup_variable("AUTOTEST_" + globalVar.station, xmlObj.getKeyVal("test_main", "autotest", "0")) == '1':
+ AutoTest = True
+
+ OverrideAutoReboot = str2bool(xmlObj.getKeyVal("test_main", "OverrideAutoReboot", "0"))
+ delay_reboot = xmlObj.getKeyVal("test_main", "delayReboot", "0")
+
+ logObj.getlogger().info("AutoTest: {} OverrideAutoReboot: {} delay Reboot: {}".format(AutoTest, OverrideAutoReboot, delay_reboot))
+
# Check if current state is "WAIT_TEST_START". if not, waits here
check_first_state()
@@ -331,28 +420,7 @@ if __name__ == "__main__":
# Change state to "TESTS_CHECKING_ENV"
set_next_state(StationStates.TESTS_CHECKING_ENV.name)
- # Wait before beginning
- time.sleep(2)
-
- # Look for a barcode scanner
- qr = QRReader()
- if qr.openQR():
- qr.closeQR()
- else:
- # Error
- execute_station_error("Cannot find a barcode scanner")
-
- # Look for an amperimeter
- amp = Amper()
- if not amp.open():
- # Error
- execute_station_error("Cannot open an amperimeter COM port")
- if not amp.hello():
- if not amp.hello():
- # Error
- execute_station_error("Cannot open find an amperimeter")
-
- # Execute main
- main()
+ # execute main program
+ execute_test()
exit(0)