diff options
author | Manel Caro <mcaro@iatec.biz> | 2021-11-06 16:28:38 +0100 |
---|---|---|
committer | Manel Caro <mcaro@iatec.biz> | 2021-11-06 16:28:38 +0100 |
commit | cf19bfe18cbd283b188a858ee1629f9909c924f4 (patch) | |
tree | 1efb23519727130058401df090ab1b5f4cc8ba99 /test-cli | |
parent | b6932fbaf898724ae87c29f8965621610f377084 (diff) | |
parent | d5b273a3b58a250742049df4ca0ef0ba54f53d33 (diff) | |
download | board-cf19bfe18cbd283b188a858ee1629f9909c924f4.zip board-cf19bfe18cbd283b188a858ee1629f9909c924f4.tar.gz board-cf19bfe18cbd283b188a858ee1629f9909c924f4.tar.bz2 |
Diffstat (limited to 'test-cli')
71 files changed, 3534 insertions, 1111 deletions
diff --git a/test-cli/.idea/dictionaries/hfernandez.xml b/test-cli/.idea/dictionaries/hfernandez.xml new file mode 100644 index 0000000..c85feeb --- /dev/null +++ b/test-cli/.idea/dictionaries/hfernandez.xml @@ -0,0 +1,3 @@ +<component name="ProjectDictionaryState"> + <dictionary name="hfernandez" /> +</component>
\ No newline at end of file diff --git a/test-cli/.idea/inspectionProfiles/profiles_settings.xml b/test-cli/.idea/inspectionProfiles/profiles_settings.xml new file mode 100644 index 0000000..dd4c951 --- /dev/null +++ b/test-cli/.idea/inspectionProfiles/profiles_settings.xml @@ -0,0 +1,7 @@ +<component name="InspectionProjectProfileManager"> + <settings> + <option name="PROJECT_PROFILE" value="Default" /> + <option name="USE_PROJECT_PROFILE" value="false" /> + <version value="1.0" /> + </settings> +</component>
\ No newline at end of file diff --git a/test-cli/.idea/misc.xml b/test-cli/.idea/misc.xml new file mode 100644 index 0000000..3999087 --- /dev/null +++ b/test-cli/.idea/misc.xml @@ -0,0 +1,7 @@ +<?xml version="1.0" encoding="UTF-8"?> +<project version="4"> + <component name="JavaScriptSettings"> + <option name="languageLevel" value="ES6" /> + </component> + <component name="ProjectRootManager" version="2" project-jdk-name="Python 3.6" project-jdk-type="Python SDK" /> +</project>
\ No newline at end of file diff --git a/test-cli/.idea/modules.xml b/test-cli/.idea/modules.xml new file mode 100644 index 0000000..fc2e93a --- /dev/null +++ b/test-cli/.idea/modules.xml @@ -0,0 +1,8 @@ +<?xml version="1.0" encoding="UTF-8"?> +<project version="4"> + <component name="ProjectModuleManager"> + <modules> + <module fileurl="file://$PROJECT_DIR$/.idea/test-cli.iml" filepath="$PROJECT_DIR$/.idea/test-cli.iml" /> + </modules> + </component> +</project>
\ No newline at end of file diff --git a/test-cli/.idea/test-cli.iml b/test-cli/.idea/test-cli.iml new file mode 100644 index 0000000..218fb9e --- /dev/null +++ b/test-cli/.idea/test-cli.iml @@ -0,0 +1,12 @@ +<?xml version="1.0" encoding="UTF-8"?> +<module type="PYTHON_MODULE" version="4"> + <component name="NewModuleRootManager"> + <content url="file://$MODULE_DIR$" /> + <orderEntry type="jdk" jdkName="Python 3.6" jdkType="Python SDK" /> + <orderEntry type="sourceFolder" forTests="false" /> + </component> + <component name="PyDocumentationSettings"> + <option name="format" value="GOOGLE" /> + <option name="myDocStringFormat" value="Google" /> + </component> +</module>
\ No newline at end of file diff --git a/test-cli/.idea/vcs.xml b/test-cli/.idea/vcs.xml new file mode 100644 index 0000000..6c0b863 --- /dev/null +++ b/test-cli/.idea/vcs.xml @@ -0,0 +1,6 @@ +<?xml version="1.0" encoding="UTF-8"?> +<project version="4"> + <component name="VcsDirectoryMappings"> + <mapping directory="$PROJECT_DIR$/.." vcs="Git" /> + </component> +</project>
\ No newline at end of file diff --git a/test-cli/.idea/workspace.xml b/test-cli/.idea/workspace.xml new file mode 100644 index 0000000..1d513ac --- /dev/null +++ b/test-cli/.idea/workspace.xml @@ -0,0 +1,120 @@ +<?xml version="1.0" encoding="UTF-8"?> +<project version="4"> + <component name="ChangeListManager"> + <list default="true" id="4991a6e0-1b9d-4824-9b6e-5ac031eb4816" name="Default Changelist" comment=""> + <change beforePath="$PROJECT_DIR$/.idea/workspace.xml" beforeDir="false" afterPath="$PROJECT_DIR$/.idea/workspace.xml" afterDir="false" /> + <change beforePath="$PROJECT_DIR$/test/tests/qusbdual.py" beforeDir="false" afterPath="$PROJECT_DIR$/test/tests/qusbdual.py" afterDir="false" /> + <change beforePath="$PROJECT_DIR$/test_main.py" beforeDir="false" afterPath="$PROJECT_DIR$/test_main.py" afterDir="false" /> + </list> + <option name="SHOW_DIALOG" value="false" /> + <option name="HIGHLIGHT_CONFLICTS" value="true" /> + <option name="HIGHLIGHT_NON_ACTIVE_CHANGELIST" value="false" /> + <option name="LAST_RESOLUTION" value="IGNORE" /> + </component> + <component name="FileTemplateManagerImpl"> + <option name="RECENT_TEMPLATES"> + <list> + <option value="Python Script" /> + </list> + </option> + </component> + <component name="Git.Settings"> + <option name="RECENT_GIT_ROOT_PATH" value="$PROJECT_DIR$/.." /> + </component> + <component name="ProjectId" id="1YfjUHHfmr4Muw0TDMT8ItsS6oU" /> + <component name="ProjectLevelVcsManager" settingsEditedManually="true"> + <ConfirmationsSetting value="2" id="Add" /> + </component> + <component name="ProjectViewState"> + <option name="hideEmptyMiddlePackages" value="true" /> + <option name="showLibraryContents" value="true" /> + </component> + <component name="PropertiesComponent"> + <property name="ASKED_ADD_EXTERNAL_FILES" value="true" /> + <property name="RunOnceActivity.OpenProjectViewOnStart" value="true" /> + <property name="RunOnceActivity.ShowReadmeOnStart" value="true" /> + <property name="SHARE_PROJECT_CONFIGURATION_FILES" value="true" /> + <property name="WebServerToolWindowFactoryState" value="false" /> + <property name="last_opened_file_path" value="$PROJECT_DIR$/test/tests" /> + <property name="settings.editor.selected.configurable" value="configurable.group.appearance" /> + </component> + <component name="RecentsManager"> + <key name="CopyFile.RECENT_KEYS"> + <recent name="$PROJECT_DIR$/test/tests" /> + <recent name="$PROJECT_DIR$/test/tasks" /> + <recent name="$PROJECT_DIR$/test/helpers" /> + </key> + <key name="MoveFile.RECENT_KEYS"> + <recent name="$PROJECT_DIR$/test/scripts" /> + </key> + </component> + <component name="RunManager"> + <configuration name="test" type="PythonConfigurationType" factoryName="Python"> + <module name="test-cli" /> + <option name="INTERPRETER_OPTIONS" value="" /> + <option name="PARENT_ENVS" value="true" /> + <envs> + <env name="PYTHONUNBUFFERED" value="1" /> + </envs> + <option name="SDK_HOME" value="/usr/bin/python3.6" /> + <option name="WORKING_DIRECTORY" value="$PROJECT_DIR$" /> + <option name="IS_MODULE_SDK" value="false" /> + <option name="ADD_CONTENT_ROOTS" value="true" /> + <option name="ADD_SOURCE_ROOTS" value="true" /> + <option name="SCRIPT_NAME" value="$PROJECT_DIR$/test_main.py" /> + <option name="PARAMETERS" value="" /> + <option name="SHOW_COMMAND_LINE" value="false" /> + <option name="EMULATE_TERMINAL" value="false" /> + <option name="MODULE_MODE" value="false" /> + <option name="REDIRECT_INPUT" value="false" /> + <option name="INPUT_FILE" value="" /> + <method v="2" /> + </configuration> + </component> + <component name="SvnConfiguration"> + <configuration /> + </component> + <component name="TaskManager"> + <task active="true" id="Default" summary="Default task"> + <changelist id="4991a6e0-1b9d-4824-9b6e-5ac031eb4816" name="Default Changelist" comment="" /> + <created>1583340556211</created> + <option name="number" value="Default" /> + <option name="presentableId" value="Default" /> + <updated>1583340556211</updated> + <workItem from="1583340557592" duration="4841000" /> + </task> + <servers /> + </component> + <component name="TypeScriptGeneratedFilesManager"> + <option name="version" value="1" /> + </component> + <component name="VcsManagerConfiguration"> + <option name="ADD_EXTERNAL_FILES_SILENTLY" value="true" /> + </component> + <component name="WindowStateProjectService"> + <state x="668" y="354" width="640" height="396" key="#com.intellij.fileTypes.FileTypeChooser" timestamp="1593588611185"> + <screen x="67" y="27" width="1853" height="1053" /> + </state> + <state x="668" y="354" width="640" height="396" key="#com.intellij.fileTypes.FileTypeChooser/67.27.1853.1053/1920.0.1920.1080@67.27.1853.1053" timestamp="1593588611185" /> + <state x="726" y="301" width="524" height="502" key="#com.intellij.refactoring.safeDelete.UnsafeUsagesDialog" timestamp="1593533225414"> + <screen x="67" y="27" width="1853" height="1053" /> + </state> + <state x="726" y="301" width="524" height="502" key="#com.intellij.refactoring.safeDelete.UnsafeUsagesDialog/67.27.1853.1053/1920.0.1920.1080@67.27.1853.1053" timestamp="1593533225414" /> + <state x="482" y="187" key="SettingsEditor" timestamp="1594023927905"> + <screen x="67" y="27" width="1853" height="1053" /> + </state> + <state x="482" y="187" key="SettingsEditor/67.27.1853.1053/1920.0.1920.1080@67.27.1853.1053" timestamp="1594023927905" /> + <state width="498" height="446" key="SwitcherDM" timestamp="1594047303932"> + <screen x="67" y="27" width="1853" height="1053" /> + </state> + <state width="498" height="446" key="SwitcherDM/67.27.1853.1053/1920.0.1920.1080@67.27.1853.1053" timestamp="1594047303932" /> + <state x="607" y="223" width="762" height="658" key="com.intellij.openapi.editor.actions.MultiplePasteAction$ClipboardContentChooser" timestamp="1593071903123"> + <screen x="67" y="27" width="1853" height="1053" /> + </state> + <state x="607" y="223" width="762" height="658" key="com.intellij.openapi.editor.actions.MultiplePasteAction$ClipboardContentChooser/67.27.1853.1053/1920.0.1920.1080@67.27.1853.1053" timestamp="1593071903123" /> + <state x="657" y="198" width="670" height="676" key="search.everywhere.popup" timestamp="1593083588598"> + <screen x="67" y="27" width="1853" height="1053" /> + </state> + <state x="657" y="198" width="670" height="676" key="search.everywhere.popup/67.27.1853.1053/1920.0.1920.1080@67.27.1853.1053" timestamp="1593083588598" /> + </component> +</project>
\ No newline at end of file diff --git a/test-cli/test/suites/__init__.py b/test-cli/__init__.py index e69de29..e69de29 100644 --- a/test-cli/test/suites/__init__.py +++ b/test-cli/__init__.py diff --git a/test-cli/setup.xml b/test-cli/setup.xml index 41295b5..063d3e1 100644 --- a/test-cli/setup.xml +++ b/test-cli/setup.xml @@ -1,9 +1,38 @@ <?xml version="1.0"?> <data> <setup> - <test idline="1"/> <!-- Test line identify --> - <board model="IGEP0000" variant="TEST-TEST" station="1"/> - <db dbname="testsrv" type="PgSQLConnection" host="192.168.2.79" port="5432" user="admin" password="Idkfa2009" /> <!-- database setup --> + <test_main qr_wait_time="240" autotest="0" OverrideAutoReboot="1" delayReboot="1" wait_test_start="240" overrideCheckWaitStart="1"/> + <db dbname="testsrv" type="PgSQLConnection" host="dbtest01.isee.biz" port="5432" user="admin" password="Idkfa2009" /> + <qaudio play_dtmf_file="/root/hwtest-files/dtmf-13579.wav" to="/mnt/station_ramdisk" record_dtmf_file="dtmf_recorded_{}.wav" recordtime="2.00" rate="8000" aplay_run_delay="0.1"/> + <qusb loops="3" pathfrom="/root/hwtest-files" file="usb-test.bin" file_md5="usb-test.bin.md5" pathto="/mnt/test/disk2"/> + <qusb2 loops="3" pathfrom="/mnt/test/disk1" file="usb-test.bin" file_md5="usb-test.bin.md5" pathto="/nfs/station/tmp"/> + <qdmesg syslog_dmesg="/var/log/kern.log"/> + <qram loops="1" memsize="100M" memtesterPath="/root/hwtest-files/memtester" to="/mnt/station_ramdisk" ram_res_file="ram_res.txt"/> + <qnand nandtestPath="/root/hwtest-files/nandtest"/> + <NAND_Flasher name="NAND_Flasher" igep_flash="/usr/bin/igep-flash" skipnandtest="1"/> + <EEProm_Flasher name="EEPROM_Flasher"/> + <qwifi loops="1" interface="wlan0" bwexpected="5.0" port="5000" serverip="192.168.5.4" duration="10" to="/mnt/station_ramdisk" wifi_res_file="eth_test_{}.json" wifi_st_file="eth_st_{}.json" retries="5" wait_retry="10"/> + <qeth loops="1" interface="eth0" bwexpected="5.0" port="5000" serverip="192.168.60.3" duration="10" to="/mnt/station_ramdisk" eth_res_file="eth_test_{}.json" eth_st_file="eth_st_{}.json" retries="5" wait_retry="10"/> + <qeeprom i2c_address="0050" i2c_bus="0"/> + <qemmc device="mmcblk0" to="/mnt/station_ramdisk" emmc_res_file="emmc_status.txt"/> + <qplc firmwarepath="/root/hwtest-files/firmware" firmware="" firmware_md5="" + factory_tool="/root/hwtest-files/apps/configlayer" gen_ip="0" gen_mac="1" + mtd_device="/dev/mtd0" factory_password="betera" firmware_password="paterna" + skipflash="0" + /> + <qvideo sdl_driver="x11" sdl_display=":0" capture_size_w="1280" capture_size_h="1024" camdevice="video0" + cam_setupfile="/root/hwtest-files/board/scripts/v4l-cam.sh" + capture_discardframes="3" capture_framemean="3" capture_maxfailed="1" + RED_min_count="50000" BLUE_min_count="50000" GREEN_min_count="50000" + /> </setup> + <usb> + <usbdev MANUFACTURER="Linux 4.9.81+ ohci_hcd" PRODUCT="OHCI Host Controller" CLASS="09" VENDOR="1d6b" PRODID="0001" ignore="1"/> + <usbdev MANUFACTURER="Linux 4.9.81+ musb-hcd" PRODUCT="MUSB HDRC host driver" CLASS="09" VENDOR="1d6b" PRODID="0002" ignore="1"/> + <usbdev MANUFACTURER="Linux 4.9.81+ ehci_hcd" PRODUCT="EHCI Host Controller" CLASS="09" VENDOR="1d6b" PRODID="0002" ignore="1"/> + <usbdev MANUFACTURER="Linux 4.15.0-50-generic ehci_hcd" PRODUCT="EHCI Host Controller" CLASS="09" VENDOR="1d6b" PRODID="0002" ignore="1"/> + <usbdev MANUFACTURER="Linux 4.15.0-50-generic uhci_hcd" PRODUCT="UHCI Host Controller" CLASS="09" VENDOR="1d6b" PRODID="0001" ignore="1"/> + <usbdev MANUFACTURER="Linux 4.9.185+ musb-hcd" PRODUCT="MUSB HDRC host driver" CLASS="09" VENDOR="1d6b" PRODID="0002" ignore="1"/> + </usb> </data> diff --git a/test-cli/test/__init__.pyc b/test-cli/test/__init__.pyc Binary files differindex cd50092..3248efc 100644 --- a/test-cli/test/__init__.pyc +++ b/test-cli/test/__init__.pyc diff --git a/test-cli/test/enums/StationStates.py b/test-cli/test/enums/StationStates.py new file mode 100644 index 0000000..88dba1f --- /dev/null +++ b/test-cli/test/enums/StationStates.py @@ -0,0 +1,22 @@ +from enum import Enum, unique + + +@unique +class StationStates(Enum): + UNDEFINED = 1000 + FTP_KERNEL = 1 + FTP_DTB = 2 + MOUNT_NFS = 3 + KERNEL_BOOT = 4 + WAIT_TEST_START = 5 + STATION_ERROR = 2000 + TESTS_CHECKING_ENV = 6 + TESTS_RUNNING = 10 + TESTS_FAILED = 40 + TESTS_OK = 30 + EXTRATASKS_RUNNING = 100 + WAITING_FOR_SCANNER = 50 + FINISHED = 60 + STATION_REBOOT = 2001 + STATION_WAIT_SHUTDOWN = 2002 + diff --git a/test-cli/test/helpers/uboot_flasher.py b/test-cli/test/enums/__init__.py index e69de29..e69de29 100644 --- a/test-cli/test/helpers/uboot_flasher.py +++ b/test-cli/test/enums/__init__.py diff --git a/test-cli/test/files/OpenSans-Regular.ttf b/test-cli/test/files/OpenSans-Regular.ttf Binary files differnew file mode 100644 index 0000000..db43334 --- /dev/null +++ b/test-cli/test/files/OpenSans-Regular.ttf diff --git a/test-cli/test/files/dtmf-13579.wav b/test-cli/test/files/dtmf-13579.wav Binary files differdeleted file mode 100644 index 1ca5b93..0000000 --- a/test-cli/test/files/dtmf-13579.wav +++ /dev/null diff --git a/test-cli/test/files/test_ok.png b/test-cli/test/files/test_ok.png Binary files differdeleted file mode 100644 index 6ccc7b3..0000000 --- a/test-cli/test/files/test_ok.png +++ /dev/null diff --git a/test-cli/test/files/test_pattern.png b/test-cli/test/files/test_pattern.png Binary files differdeleted file mode 100644 index 353aab5..0000000 --- a/test-cli/test/files/test_pattern.png +++ /dev/null diff --git a/test-cli/test/helpers/DiskHelpers.py b/test-cli/test/helpers/DiskHelpers.py new file mode 100644 index 0000000..79b661e --- /dev/null +++ b/test-cli/test/helpers/DiskHelpers.py @@ -0,0 +1,9 @@ +import stat +import os + + +def disk_exists(path): + try: + return stat.S_ISBLK(os.stat(path).st_mode) + except: + return False diff --git a/test-cli/test/helpers/__init__.pyc b/test-cli/test/helpers/__init__.pyc Binary files differindex 7d1c907..01014e2 100644 --- a/test-cli/test/helpers/__init__.pyc +++ b/test-cli/test/helpers/__init__.pyc diff --git a/test-cli/test/helpers/amper.py b/test-cli/test/helpers/amper.py new file mode 100644 index 0000000..ea719f6 --- /dev/null +++ b/test-cli/test/helpers/amper.py @@ -0,0 +1,154 @@ +import serial +import scanf + + +class Amper(object): + __ser = None + __port = '/dev/ttyUSB0' + __speed = 115200 + __parity = None + __rtscts = 0 + __timeout = 1 + __isThere = False + __version = None + __voltage = 0.0 + __current = 0.0 + __alarm_condition = 0 + + def __init__(self, port='/dev/ttyUSB0', serial_speed=115200, parity=serial.PARITY_NONE, rtscts=0): + self.__port = port + self.__speed = serial_speed + self.__parity = parity + self.__rtscts = rtscts + + def open(self): + try: + if self.__ser is not None: + self.close() + self.__ser = serial.Serial(port=self.__port, + baudrate=self.__speed, + timeout=self.__timeout, + parity=self.__parity, + rtscts=self.__rtscts, + bytesize=serial.EIGHTBITS, + stopbits=serial.STOPBITS_ONE, + xonxoff=0) + + self.__ser.flushInput() + self.__ser.flushOutput() + self.__ser.break_condition = False + return True + except serial.SerialException as err: + print(err) + return False + + def close(self): + if self.__ser is not None: + self.__ser.close() + self.__ser = None + self.__isThere = False + self.__version = None + self.__voltage = 0.0 + self.__current = 0.0 + self.__alarm_condition = 0 + + def __write(self, data): + data += "\n\r" + dat = data.encode('ascii') + self.__ser.write(dat) + + def __read(self): + return self.__ser.readline().decode().rstrip() + + def __set_voltage_underlimit(self, under_limit): + if self.__isThere and self.__ser is not None: + self.__write('AT+VIN_UV_W_LIM={}'.format(under_limit)) + dat = self.__read() + print(dat) + + def __get_voltage_underlimit(self): + self.__write('AT+VIN_UV_W_LIM=?') + dat = self.__read() + res = scanf.scanf("%f", dat) + return res[0] + + def __set_voltage_overlimit(self, over_limit): + if self.__isThere and self.__ser is not None: + self.__write('AT+VIN_OV_W_LIM={}'.format(over_limit)) + dat = self.__read() + print(dat) + + def __get_voltage_overlimit(self): + if self.__isThere and self.__ser is not None: + self.__write('AT+VIN_OV_W_LIM?') + dat = self.__read() + res = scanf.scanf("%f", dat) + return res[0] + + def __set_current_overlimit(self, over_limit): + if self.__isThere and self.__ser is not None: + self.__write('AT+IOUT_W_LIM={}'.format(over_limit)) + dat = self.__read() + print(dat) + + def __get_current_overlimit(self): + if self.__isThere and self.__ser is not None: + self.__write('AT+IOUT_W_LIM?') + dat = self.__read() + dat = self.__read() + res = scanf.scanf("%f", dat) + return res[0] + + def hello(self): + self.__isThere = False + if self.__ser is None: + return False + self.__write('at+ver?') + dat = self.__read() + res = scanf.scanf("ISEE Amper v%3c", dat) + if res is None: + return False + self.__isThere = True + self.__version = res[0] + return True + + def getVersion(self): + return self.__version + + def getVoltage(self): + if self.__isThere and self.__ser is not None: + self.__write('at+vin?') + dat = self.__read() + res = scanf.scanf("%f", dat) + self.__voltage = res[0] + # print(self.__voltage) + return self.__voltage + else: + return None + + def getCurrent(self): + if self.__isThere and self.__ser is not None: + self.__write('at+in?') + dat = self.__read() + res = scanf.scanf("%f", dat) + self.__current = res[0] + # print(self.__current) + else: + return None + return self.__current + + def getAlarm(self): + if self.__isThere and self.__ser is not None: + self.__write('at+st_mfr?') + dat = self.__read() + # print(dat) + self.__alarm_condition = (int(dat) & 0x0F) + # print(self.__alarm_condition) + return self.__alarm_condition + + def clearAlarm(self): + if self.__isThere and self.__ser is not None: + self.__write('at+CLRFAULT') + dat = self.__read() + # print(dat) + diff --git a/test-cli/test/helpers/button_script.sh b/test-cli/test/helpers/button_script.sh deleted file mode 100644 index 6908f22..0000000 --- a/test-cli/test/helpers/button_script.sh +++ /dev/null @@ -1,4 +0,0 @@ -#!/usr/bin/env bash -i2cset -f -y 1 0x2d 0x40 0x31 -i2cset -f -y 1 0x2d 0x50 0xff -i2cget -f -y 1 0x2d 0x50 diff --git a/test-cli/test/helpers/camara.py b/test-cli/test/helpers/camara.py new file mode 100644 index 0000000..b23df74 --- /dev/null +++ b/test-cli/test/helpers/camara.py @@ -0,0 +1,116 @@ +import cv2 +import sh + +class Camara(object): + __device_name = None + __device = None + __setupScriptPath = '' + __w = 1280 + __h = 720 + __contrast = 0.0 + __brightness = 0.0 + __saturation = 55.0 + __hue = 0.0 + __exposure = 166 + + def __init__(self, setup_script_path, device="video0", width=1280, height=720): + self.__device_name = device + self.__w = width + self.__h = height + self.__setupScriptPath = setup_script_path + + def Close(self): + if self.__device is not None: + del self.__device + self.__device = None + + def Open(self): + self.Close() + self.__device = cv2.VideoCapture("/dev/{}".format(self.__device_name)) + if self.__device.isOpened(): + self.__configure() + return True + return False + + def getSize(self): + return self.__w, self.__h + + def setSize(self, w, h): + if self.__device is not None and self.__device.isOpened(): + self.__w = self.__setCamVar(cv2.CAP_PROP_FRAME_WIDTH, w) + self.__h = self.__setCamVar(cv2.CAP_PROP_FRAME_HEIGHT, h) + else: + self.__w = w + self.__h = h + + def setContrast(self, newVal): + if self.__device.isOpened(): + self.__contrast = self.__setCamVar(cv2.CAP_PROP_CONTRAST, newVal) + else: + self.__contrast = newVal + + def getContrast(self): + return self.__contrast + + def setBrightness(self, newVal): + if self.__device.isOpened(): + self.__brightness = self.__setCamVar(cv2.CAP_PROP_BRIGHTNESS, newVal) + else: + self.__brightness = newVal + + def getBrightness(self): + return self.__brightness + + def __configure(self): + self.__w = self.__setCamVar(cv2.CAP_PROP_FRAME_WIDTH, self.__w) + self.__h = self.__setCamVar(cv2.CAP_PROP_FRAME_HEIGHT, self.__h) + sh.bash(self.__setupScriptPath) + + def __setCamVar(self, key, val): + valold = cv2.VideoCapture.get(self.__device, key) + if valold != val: + cv2.VideoCapture.set(self.__device, key, val) + t = cv2.VideoCapture.get(self.__device, key) + return t + return val + + def __getCamVar(self, key): + return cv2.VideoCapture.get(self.__device, key); + + def getFrameCount(self): + return cv2.VideoCapture.get(self.__device, cv2.CAP_PROP_BUFFERSIZE); + + def getFrame(self): + if self.__device.isOpened(): + retval, image = self.__device.read() + if retval: + return image + return None + else: + return None + + def getImageSize(self, image): + if hasattr(image, 'shape'): + return image.shape[1], image.shape[0] + else: + return (0, 0, 0) + + def showFrame(self, name, frame, w=False, maxTime=3000): + cv2.imshow(name, frame) + if w: + if maxTime == -1: + cv2.waitKey() + else: + cv2.waitKey(maxTime) + + def saveFrame(self, fileName, Frame): + cv2.imwrite(fileName, Frame) + + def readImage(self, filename): + return cv2.imread('{}'.format(filename), 0) + + def destroyWindow(self, name): + cv2.destroyWindow(name) + + def closeWindows(self): + cv2.destroyAllWindows()
\ No newline at end of file diff --git a/test-cli/test/helpers/changedir.py b/test-cli/test/helpers/changedir.py new file mode 100644 index 0000000..fad9ade --- /dev/null +++ b/test-cli/test/helpers/changedir.py @@ -0,0 +1,14 @@ +import os + + +class changedir: + """Context manager for changing the current working directory""" + def __init__(self, newPath): + self.newPath = os.path.expanduser(newPath) + + def __enter__(self): + self.savedPath = os.getcwd() + os.chdir(self.newPath) + + def __exit__(self, etype, value, traceback): + os.chdir(self.savedPath)
\ No newline at end of file diff --git a/test-cli/test/helpers/cmdline.py b/test-cli/test/helpers/cmdline.py new file mode 100644 index 0000000..db94a1c --- /dev/null +++ b/test-cli/test/helpers/cmdline.py @@ -0,0 +1,28 @@ + +class LinuxKernelCmd: + + __kernel_vars = {} + + def __init__(self): + self.parse_kernel_cmdline() + + def parse_kernel_cmdline(self): + cmdline = open('/proc/cmdline', 'rb').read().decode() + cmd_array = cmdline.split() + i = 0 + for f in cmdline.split(): + pos = f.find('=') + if pos == -1: + cmd_array[i - 1] = cmd_array[i - 1] + " " + f + cmd_array.remove(cmd_array[i]) + else: + i += 1 + self.__kernel_vars = {} + for f in cmd_array: + dat = f.split('=') + self.__kernel_vars[dat[0]] = dat[1] + + def getkvar(self, vName, vdefault): + if vName in self.__kernel_vars: + return self.__kernel_vars[vName] + return vdefault diff --git a/test-cli/test/helpers/cv_display_test.py b/test-cli/test/helpers/cv_display_test.py index b54a698..3a3004f 100644 --- a/test-cli/test/helpers/cv_display_test.py +++ b/test-cli/test/helpers/cv_display_test.py @@ -14,8 +14,7 @@ def pattern_detect(cam_device=0): # RETURN 0 only if the test is ok msg="0" # Capture the corresponding camera device [0,1] - #capture = cv2.VideoCapture(0) - capture = cv2.VideoCapture("/dev/v4l/by-id/usb-Creative_Technology_Ltd._Live__Cam_Sync_HD_VF0770-video-index0") + capture = cv2.VideoCapture(0) try: _, image = capture.read() except: @@ -64,12 +63,10 @@ def pattern_detect(cam_device=0): # After testing the gamma factor correction is computed with the green color # gamma = gamma_factor / blue+red into green part # gamma factor = 50-100 - #gamma = (100 / (avg_color_rawg[0] + avg_color_rawg[2] + 1)) - gamma=1 + gamma = (100 / (avg_color_rawg[0] + avg_color_rawg[2] + 1)) # Adjust the image acording to this gamma value adjusted = adjust_gamma(image, gamma=gamma) -# adjusted=image - cv2.imwrite( "/home/root/result_hdmi_img.jpg", adjusted); + # Calculate again the average color using the gamma adjusted image # Crop the gamma adjusted image for wach color section red_cal = adjusted[y1:y2, xr1:xr2] @@ -109,19 +106,14 @@ def pattern_detect(cam_device=0): mask_r = mask0 + mask1 # mask_r = cv2.inRange(hsv, lower_red, upper_red) # Perform a morphological open to expand -# kernel = np.ones((5, 5), np.uint8) -# closing_r = cv2.morphologyEx(mask_r, cv2.MORPH_OPEN, kernel) -# closing_b = cv2.morphologyEx(mask_b, cv2.MORPH_OPEN, kernel) -# closing_g = cv2.morphologyEx(mask_g, cv2.MORPH_OPEN, kernel) + kernel = np.ones((15, 15), np.uint8) + closing_r = cv2.morphologyEx(mask_r, cv2.MORPH_OPEN, kernel) + closing_b = cv2.morphologyEx(mask_b, cv2.MORPH_OPEN, kernel) + closing_g = cv2.morphologyEx(mask_g, cv2.MORPH_OPEN, kernel) # Count the number of pixels that are not of the corresponding color (black) -# count_r = cv2.countNonZero(closing_r) -# count_b = cv2.countNonZero(closing_b) -# count_g = cv2.countNonZero(closing_g) - #----------- - count_r = cv2.countNonZero(mask_r) - count_b = cv2.countNonZero(mask_b) - count_g = cv2.countNonZero(mask_g) - #------- + count_r = cv2.countNonZero(closing_r) + count_b = cv2.countNonZero(closing_b) + count_g = cv2.countNonZero(closing_g) if (count_r < 5): msg = "RED COUNT FAIL" return msg diff --git a/test-cli/test/helpers/detect.py b/test-cli/test/helpers/detect.py new file mode 100644 index 0000000..193dabf --- /dev/null +++ b/test-cli/test/helpers/detect.py @@ -0,0 +1,61 @@ +import cv2 +import numpy as np + + +class Detect_Color(object): + oFrame = None + img_hsv = None + __red_lower1 = [0, 50, 20] + __red_upper1 = [5, 255, 255] + __red_lower2 = [175, 50, 20] + __red_upper2 = [180, 255, 255] + __blue_lower = [110, 50, 50] + __blue_upper = [130, 255, 255] + __green_lower = [36, 25, 25] + __green_upper = [86, 255, 255] + + def __init__(self, frame): + self.oFrame = frame + self.__hist() + + def __hist(self): + self.img_hsv = cv2.cvtColor(self.oFrame, cv2.COLOR_BGR2HSV) + + def getRed(self): + return self.__detect_two_areas([0, 50, 20], [5, 255, 255], [175, 50, 20], [180, 255, 255]) + + def getBlue(self): + return self.__detect_one_area([110, 50, 50], [130, 255, 255]) + + def getGreen(self): + return self.__detect_one_area([36, 25, 25], [86, 255, 255]) + + def __detect_one_area(self, lower, upper): + a_lower = np.array(lower) + a_upper = np.array(upper) + c_mask = cv2.inRange(self.img_hsv, a_lower, a_upper) + croped = cv2.bitwise_and(self.oFrame, self.oFrame, mask=c_mask) + mean_v = cv2.mean(self.oFrame, mask=c_mask) + mean = {} + mean['R'] = mean_v[2] + mean['G'] = mean_v[1] + mean['B'] = mean_v[0] + count = cv2.countNonZero(c_mask) + return c_mask, croped, mean, count + + def __detect_two_areas(self, lower1, upper1, lower2, upper2): + a1_lower = np.array(lower1) + a1_upper = np.array(upper1) + a2_lower = np.array(lower2) + a2_upper = np.array(upper2) + c_mask1 = cv2.inRange(self.img_hsv, a1_lower, a1_upper) + c_mask2 = cv2.inRange(self.img_hsv, a2_lower, a2_upper) + c_mask = cv2.bitwise_or(c_mask1, c_mask2) + croped = cv2.bitwise_and(self.oFrame, self.oFrame, mask=c_mask) + mean_v = cv2.mean(self.oFrame, mask=c_mask) + mean = {} + mean['R'] = mean_v[2] + mean['G'] = mean_v[1] + mean['B'] = mean_v[0] + count = cv2.countNonZero(c_mask) + return c_mask, croped, mean, count diff --git a/test-cli/test/helpers/finisher.py b/test-cli/test/helpers/finisher.py deleted file mode 100644 index 73142d9..0000000 --- a/test-cli/test/helpers/finisher.py +++ /dev/null @@ -1,151 +0,0 @@ -from test.helpers.syscmd import SysCommand -from test.helpers.globalVariables import globalVar -import binascii -import uuid -import subprocess -from PIL import Image, ImageDraw, ImageFont -import qrcode -import PIL - -class Finisher(object): - __muhb = None - __final_to_burn_to_eeprom = None - __qr_image = None - - def __init__(self, muhb = None): - self.__muhb = muhb - self.__final_to_burn_to_eeprom = bytearray() - self.__qr_image = None - pass - - def eeprom_burn(self): - ## Create binary file - str_cmd2 = "find /sys/ -iname 'eeprom'" - eeprom_location = SysCommand("eeprom_location", str_cmd2) - if eeprom_location.execute() == 0: - raw_out = eeprom_location.getOutput() - if raw_out == "": - self.fail("Unable to get EEPROM location. IS EEPROM CONNECTED?") - eeprom = raw_out.decode('ascii') - eeprom = eeprom.strip('\n') - ## push binary data to eeprom like if working with files - file2 = open(eeprom, "w+b") - file2.write(self.__final_to_burn_to_eeprom) - else: - self.fail("failed: could not complete find eeprom command") - - def generate_qr_stamp(self): - # Generate QR to put in a stamp - qr = qrcode.QRCode( - version=1, - error_correction=qrcode.constants.ERROR_CORRECT_L, - box_size=10, - border=4, - ) - # Use board_uuid to generate the QR code - qr.add_data(globalVar.g_uuid) - qr.make(fit=True) - # Save QR as image stamp.png - img = qr.make_image() - # Store QR as a class atrib - self.__qr_image = img - img.save('/home/root/stamp.png') - - def print_stamp(self): - # Print stamp by sending a cmd to the printer - str_cmd3 = "lpr -o scaling=4 stamp.png".format(self.__display) - printstamp = SysCommand("printstamp", str_cmd3) - if printstamp.execute() != 0: - self.fail("failed: could not print stamp") - - def generate_screen(self): - # Generate green image with board uuid and QR maybe pasted on top of green image - # Define image size - W = 1250 - H = 703 - # Create blank rectangle to write on - image = Image.new('RGB', (W, H), (46, 204, 113, 0)) - draw = ImageDraw.Draw(image) - message = "TEST OK\n\n" + "Board UUID: "+ globalVar.g_uuid - font = ImageFont.truetype('/usr/share/fonts/truetype/dejavuu/DejaVuSans.ttf', 40) - # Calculate the width and height of the text to be drawn, given font size - w, h = draw.textsize(message, font=font) - # Write the text to the image, where (x,y) is the top left corner of the text - draw.text(((W-w)/2,(H-h)/2), message, align='center', font=font) - #draw.image(self.__qr_image) - image.save('/home/root/test/files/test_ok.png') - - def show_result_screen(self): - # If test OK show test_OK.png located in /home/root, If test fail show test_fail.png, located in /home/root/test/files/test_KO.png - if globalVar.fstatus: - str_cmd4 = "fbi -T 1 --noverbose -d /dev/{} test/files/test_ok.png".format('fb0') - else: - str_cmd4 = "fbi -T 1 --noverbose -d /dev/{} test/files/test_ko.png".format('fb0') - - display_image = SysCommand("display_image", str_cmd4) - #print(display_image.execute()) - if display_image.execute() != -1: - self.fail("failed: could not display the image") - - - def end_ok(self): - # Burn retrieved igep eeprom struct - new = self.__muhb[0][0] - - # Convert from string to hex - hnew = new.encode() - # Magic_id and default crc32 0x6d, 0x6a, 0x6d, 0xe4 with endianess changed so that u-boot loads it correctly - # IF magic ever changes this magic_id should be changed. At the end always magic_id of test and u-boot should - # be the same - magic_id = bytes([0xe4, 0x6d, 0x6a, 0x6d]) - default_igep_crc32 = bytes([0x00, 0x00, 0x00, 0x00]) - - # Create bytearray - to_calculate = bytearray() - # Build the final hex binary for crc32 operation - to_calculate.extend(magic_id) - to_calculate.extend(default_igep_crc32) - to_calculate.extend(hnew) - - # Calculate crc32! - new_crc32 = binascii.crc32(to_calculate) - hnew_crc32 = new_crc32.to_bytes(4, byteorder="little") - - # Recreate final eeprom struct in bytearray - self.__final_to_burn_to_eeprom = bytearray() - self.__final_to_burn_to_eeprom.extend(magic_id) - self.__final_to_burn_to_eeprom.extend(hnew_crc32) - self.__final_to_burn_to_eeprom.extend(hnew) - self.eeprom_burn() - - # Generate QR stamp - self.generate_qr_stamp() - # Send print stamp command - #self.print_stamp() - # Generate green image with board uuid and QR maybe pasted on top of green image - self.generate_screen() - # Show test_ok.png image in screen - self.show_result_screen() - - def end_fail(self): - # Burn igep eeprom bug struct - hnew = self.__muhb.encode() - default_fail = bytes([0xD0, 0xBA, 0xD0, 0xBA]) - self.__final_burn_to_eeprom = bytearray() - self.__final_to_burn_to_eeprom.extend(default_fail) - self.__final_to_burn_to_eeprom.extend(default_fail) - self.__final_to_burn_to_eeprom.extend(hnew) - self.__final_to_burn_to_eeprom.extend(default_fail) - self.__final_to_burn_to_eeprom.extend(default_fail) - self.__final_to_burn_to_eeprom.extend(default_fail) - self.__final_to_burn_to_eeprom.extend(default_fail) - self.__final_to_burn_to_eeprom.extend(default_fail) - self.__final_to_burn_to_eeprom.extend(default_fail) - self.__final_to_burn_to_eeprom.extend(default_fail) - self.__final_to_burn_to_eeprom.extend(default_fail) - self.__final_to_burn_to_eeprom.extend(default_fail) - self.__final_to_burn_to_eeprom.extend(default_fail) - self.__final_to_burn_to_eeprom.extend(default_fail) - self.eeprom_burn() - # Show test_ko.png image in screen - self.show_result_screen()
\ No newline at end of file diff --git a/test-cli/test/helpers/get_dieid.py b/test-cli/test/helpers/get_dieid.py deleted file mode 100644 index b20f143..0000000 --- a/test-cli/test/helpers/get_dieid.py +++ /dev/null @@ -1,43 +0,0 @@ -import mmap -import os -import struct -MAP_MASK = mmap.PAGESIZE - 1 -WORD = 4 -def read(addr): - """ Read from any location in memory - Returns the readed value in hexadecimal format - Keyword arguments: - - addr: The memory address to be readed. - """ - fd = os.open("/dev/mem", os.O_RDWR | os.O_SYNC) - # Map one page - mm = mmap.mmap(fd, mmap.PAGESIZE, mmap.MAP_SHARED, mmap.PROT_WRITE | mmap.PROT_READ, offset=addr & ~MAP_MASK) - mm.seek(addr & MAP_MASK) - retval = struct.unpack('I', mm.read(WORD)) - mm.close() - os.close(fd) - return "%08X" % retval[0] - -def getRegisters(model): - if model.find("IGEP0046") == 0: - registers = [0x021BC420, 0x021BC410] - elif model.find("IGEP0000") == 0: - registers = [0x021BC420, 0x021BC410] - elif model.find("IGEP0034") == 0 or model.find("SOPA0000") == 0: - registers = [0x44e10630, 0x44e10634, 0x44e10638, 0x44e1063C] - elif model.find("OMAP3") == 0: - registers = [0x4830A224, 0x4830A220, 0x4830A21C, 0x4830A218] - elif model.find("OMAP5") == 0: - registers = [0x4A002210, 0x4A00220C, 0x4A002208, 0x4A002200] - return registers - -def genDieid(modelid): - registers=getRegisters(modelid) - id="" - for i in range(len(registers)): - id=id+(read(registers[i])) - return id - -#if __name__ == "__main__": - #registers = [0x021BC420, 0x021BC410] - #print(main(registers)) diff --git a/test-cli/test/helpers/globalVariables.py b/test-cli/test/helpers/globalVariables.py index c4d8358..baaae57 100644 --- a/test-cli/test/helpers/globalVariables.py +++ b/test-cli/test/helpers/globalVariables.py @@ -6,5 +6,4 @@ def globalVar(): g_mid = "" outdata = "NONE" station = "" - fstatus = "" - gdisplay = None
\ No newline at end of file + taskid_ctl = "" diff --git a/test-cli/test/helpers/gpio.py b/test-cli/test/helpers/gpio.py new file mode 100644 index 0000000..f127f2b --- /dev/null +++ b/test-cli/test/helpers/gpio.py @@ -0,0 +1,55 @@ +import os + +class gpio (object): + gpioNum = None + + def __init__(self, gpioNum, dir, val): + self.gpioNum = gpioNum + self.__export(gpioNum) + self.__set_dir(gpioNum, dir) + if dir == 'out': + self.__set_value(gpioNum, val) + + def set_val(self, value): + self.__set_value( self.gpioNum, value) + + + def __export(self, gpio_number): + if not os.path.isfile("/sys/class/gpio/gpio{}/value".format(gpio_number)): + try: + f = open("/sys/class/gpio/export", "w", newline="\n") + f.write(str(gpio_number)) + f.close() + except IOError: + return False, '{}'.format(IOError.errno) + return True + + def __unexport(self, gpio_number): + if os.path.isfile("/sys/class/gpio/gpio{}/value".format(gpio_number)): + try: + f = open("/sys/class/gpio/unexport", "w", newline="\n") + f.write(str(gpio_number)) + f.close() + except IOError: + return False, '{}'.format(IOError.errno) + return True + + def __set_dir(self, gpio_number, dir): + try: + f = open("/sys/class/gpio/gpio{}/direction".format(gpio_number), "r+", newline="\n") + val = f.readline() + if val != dir: + f.write(dir) + f.close() + except IOError: + return False, '{}'.format(IOError.errno) + return True + + def __set_value(self, gpio_number, value): + try: + f = open("/sys/class/gpio/gpio{}/value".format(gpio_number), "w", newline="\n") + f.write(str(value)) + f.close() + except IOError: + return False, '{}'.format(IOError.errno) + return True
\ No newline at end of file diff --git a/test-cli/test/helpers/int_registers.py b/test-cli/test/helpers/int_registers.py new file mode 100644 index 0000000..133387c --- /dev/null +++ b/test-cli/test/helpers/int_registers.py @@ -0,0 +1,68 @@ +import mmap +import os +import struct +import sh + +MAP_MASK = mmap.PAGESIZE - 1 +WORD = 4 + + +def read(addr): + """ Read from any location in memory + Returns the readed value in hexadecimal format + Keyword arguments: + - addr: The memory address to be readed. + """ + fd = os.open("/dev/mem", os.O_RDWR | os.O_SYNC) + # Map one page + mm = mmap.mmap(fd, mmap.PAGESIZE, mmap.MAP_SHARED, mmap.PROT_WRITE | mmap.PROT_READ, offset=addr & ~MAP_MASK) + mm.seek(addr & MAP_MASK) + retval = struct.unpack('I', mm.read(WORD)) + mm.close() + os.close(fd) + return "%08X" % retval[0] + +def imx8m_readid(): + f = open("/sys/bus/soc/devices/soc0/soc_uid", "r", newline="\n") + val = f.readline() + f.close() + return val.rstrip() + + +def get_die_id(modelid): + dieid = "" + + # get registers + if modelid.find("IGEP0046") == 0: + registers = [0x021BC420, 0x021BC410] + elif modelid.find("IGEP0034") == 0 or modelid.find("SOPA0000") == 0: + # registers: mac_id0_lo, mac_id0_hi, mac_id1_lo, mac_id1_hi + registers = [0x44e10630, 0x44e10634, 0x44e10638, 0x44e1063C] + elif modelid.find("OMAP3") == 0 or modelid.find("IGEP0020") == 0 or modelid.find("IGEP0030") == 0: + registers = [0x4830A224, 0x4830A220, 0x4830A21C, 0x4830A218] + elif modelid.find("OMAP5") == 0: + registers = [0x4A002210, 0x4A00220C, 0x4A002208, 0x4A002200] + elif modelid.find("IGEP0048") == 0: + return imx8m_readid() + else: + raise Exception('modelid not defined: {}, modelid') + for rg in registers: + dieid = dieid + read(rg) + return dieid + + +def get_internal_mac(modelid): + mac = None + + if modelid.find("IGEP0034") == 0 or modelid.find("SOPA0000") == 0: + # # registers: mac_id0_lo, mac_id0_hi + # registers = [0x44e10630, 0x44e10634] + # mac = "" + # for rg in registers: + # mac = mac + read(rg) + # #erase trailing zeros + # mac = mac[4::1] + # # To be finished... + mac = sh.cat("/sys/class/net/eth0/address").strip() + + return mac diff --git a/test-cli/test/helpers/iseelogger.py b/test-cli/test/helpers/iseelogger.py new file mode 100644 index 0000000..4b1087c --- /dev/null +++ b/test-cli/test/helpers/iseelogger.py @@ -0,0 +1,62 @@ +import logging +import logging.handlers +import datetime + +class ISEE_Logger(object): + __logger = None + __logHandler = None + __formater = None + __logHandlerConsole = None + + def __init__(self, level=logging.INFO): + # Create syslog logger + self.__logger = logging.getLogger('ISEE_logger') + self.__logger.setLevel(level) + self.__logHandler = logging.handlers.SysLogHandler('/dev/log') + self.__logHandlerConsole = logging.StreamHandler() + self.__formater = logging.Formatter('Python: { "loggerName":"%(name)s", "timestamp":"%(asctime)s", "pathName":"%(pathname)s", "logRecordCreationTime":"%(created)f", "functionName":"%(funcName)s", "levelNo":"%(levelno)s", "lineNo":"%(lineno)d", "time":"%(msecs)d", "levelName":"%(levelname)s", "message":"%(message)s"}') + self.__logHandler.formatter = self.__formater + self.__logHandlerConsole.formatter = self.__formater + self.__logger.addHandler(self.__logHandler) + self.__logger.addHandler(self.__logHandlerConsole) + + def setLogLevel(self, level): + if level.upper() == "DEBUG": + nlevel = logging.DEBUG + elif level.upper() == "INFO": + nlevel = logging.INFO + elif level.upper() == "ERROR": + nlevel = logging.ERROR + elif level.upper() == "WARNING": + nlevel = logging.WARNING + else: + nlevel = logging.DEBUG + + self.__logger.setLevel(nlevel) + + def getlogger(self): + return self.__logger + +class MeasureTime: + __difference = None + __first_time = None + __later_time = None + + def __init__(self): + self.__first_time = datetime.datetime.now() + + def start(self): + self.first_time = datetime.datetime.now() + + def stop(self): + self.__later_time = datetime.datetime.now() + self.__difference = self.__later_time - self.__first_time + return self.__difference.total_seconds() + + def getTime(self): + return self.__difference.total_seconds() + + +global logObj +logObj = ISEE_Logger(logging.INFO) + diff --git a/test-cli/test/helpers/iw.py b/test-cli/test/helpers/iw.py new file mode 100644 index 0000000..8e29448 --- /dev/null +++ b/test-cli/test/helpers/iw.py @@ -0,0 +1,124 @@ +from sh import iw +from scanf import scanf + +class iw_scan: + __interface = None + __raw_unparsed = [] + __raw = None + + def __init__(self, interface='wlan0'): + self.__interface = interface + + def scan(self): + self.__raw_unparsed = [] + self.__raw = iw('{}'.format(self.__interface), 'scan') + if self.__raw.exit_code == 0: + self.__raw_unparsed = self.__raw.stdout.decode("utf-8").split('\n') + return True + return False + + def getRawData (self): + return self.__raw_unparsed + + def getInterface(self): + return self.__interface + + def getRawObject(self): + return self.__raw + + +class iwScan: + __iw_scan = None + __station_list = {} + + def __init__(self, interface='wlan0'): + self.__iw_scan = iw_scan(interface) + + def scan(self): + if self.__iw_scan.scan(): + self.__parse_scan(self.__iw_scan.getRawData()) + return True + return False + + def getLastError(self): + rObject = self.__iw_scan.getRawObject() + if rObject.exit_code != 0: + return rObject.stderr + return '' + + def __Parse_BBS(self, line): + data = {} + res = scanf("BSS %s(on %s) -- %s", line) + if res == None: + res = scanf("BSS %s(on %s)", line) + data["MAC"] = res[0] + data["DEV"] = res[1] + if len(res) == 3: + data["ASSOCIATED"] = 'YES' + else: + data["ASSOCIATED"] = 'NO' + return data + + def __Parse_T(self, id, OnNotFoundAttr ,line): + data = {} + res = scanf("{}: %s".format(id), line) + if res == None: + data[id.upper()] = OnNotFoundAttr + else: + data[id.upper()] = res[0] + return data + + def __Parse_X(self, line): + data = {} + res = scanf("DS Parameter set: channel %s", line) + if res == None: + data['CHANNEL'] = '-1' + else: + data['CHANNEL'] = res[0] + return data + + def __block_stationlist(self, rawdata): + list = {} + count = 0 + for line in rawdata: + idx = line.find('BSS') + if idx != -1 and idx == 0: + if len(list) > 0: + count += 1 + self.__station_list[count] = list + list = self.__Parse_BBS(line) + elif line.find('SSID:') != -1: + list = {**list, **self.__Parse_T('SSID', 'HIDDEN', line)} + elif line.find('freq:') != -1: + list = {**list, **self.__Parse_T('freq', '', line)} + elif line.find('signal:') != -1: + list = {**list, **self.__Parse_T('signal', '', line)} + elif line.find('DS Parameter set:') != -1: + list = {**list, **self.__Parse_X(line)} + if count > 0: + count += 1 + self.__station_list[count] = list + + def __parse_scan(self, rawData): + self.__station_list = {} + self.__block_stationlist(rawData) + #print('{}'.format(self.__station_list)) + #for i in self.__station_list: + # print(self.__station_list[i]) + + def findConnected (self): + for i in self.__station_list: + st = self.__station_list[i] + if st.get('ASSOCIATED', 'NO') == 'YES': + return True, self.__station_list[i] + return False, None + + def findBySSID(self, ssid): + for i in self.__station_list: + st = self.__station_list[i] + if st.get('SSID', '') == ssid: + return True, self.__station_list[i] + return False, None + + def getStations(self): + return self.__station_list
\ No newline at end of file diff --git a/test-cli/test/helpers/md5.py b/test-cli/test/helpers/md5.py new file mode 100644 index 0000000..9bc9e23 --- /dev/null +++ b/test-cli/test/helpers/md5.py @@ -0,0 +1,8 @@ +import hashlib + +def md5_file(fname): + hash_md5 = hashlib.md5() + with open(fname, "rb") as f: + for chunk in iter(lambda: f.read(4096), b""): + hash_md5.update(chunk) + return hash_md5.hexdigest()
\ No newline at end of file diff --git a/test-cli/test/helpers/plc.py b/test-cli/test/helpers/plc.py new file mode 100644 index 0000000..069058d --- /dev/null +++ b/test-cli/test/helpers/plc.py @@ -0,0 +1,82 @@ +import sh +from sh import flashcp +from sh import flash_eraseall +from sh import ErrorReturnCode +from sh import Command +from test.helpers.gpio import gpio +import time + + +class dcpPLC(object): + __nReset = None + __Phy = None + __Plc = None + __mtd_device = None + __factool = None + __myConfigTool = None + + def __init__(self, plcfactool, mtd_device): + # default save firmware + self.__nRest = gpio('75', 'out', '0') + self.__nRest = gpio('69', 'out', '0') + self.__nRest = gpio('78', 'out', '1') + self.__factool = plcfactool + self.__mtd_device = mtd_device + self.__myConfigTool = Command(self.__factool) + + def setSaveFirmwareMode(self): + self.__nRest = gpio('75', 'out', '0') + self.__nRest = gpio('69', 'out', '0') + self.__nRest = gpio('78', 'out', '1') + + def setBootMode(self): + self.__nRest = gpio('78', 'out', '0') + self.__nRest = gpio('75', 'out', '1') + self.__nRest = gpio('69', 'out', '1') + + def setPLCReset(self): + self.__nRest = gpio('75', 'out', '0') + self.__nRest = gpio('69', 'out', '0') + time.sleep(1) + self.__nRest = gpio('69', 'out', '1') + self.__nRest = gpio('75', 'out', '1') + + def SaveFirmware(self, firmare): + self.setSaveFirmwareMode() + try: + flash_eraseall(self.__mtd_device) + flashcp(firmare, self.__mtd_device) + except ErrorReturnCode as Error: + return False, "plc flash firmware failed {} ".format(Error.exit_code) + return True, '' + + def set_plc(self, var, value, password): + try: + res = self.__myConfigTool("-o", "SET", "-p", "{}={}".format(var, value), "-w", "{}".format(password)) + print(res) + except ErrorReturnCode as Error: + return False, "set var failed {} {}".format(var, Error.exit_code) + return True, '' + + def set_plc2(self, var1, value1, var2, value2, password): + try: + res = self.__myConfigTool("-o", "SET", "-p", "{}={}".format(var1, value1), "-p", + "{}={}".format(var2, value2), "-w", "{}".format(password)) + print(res) + except ErrorReturnCode as Error: + return False, "set var failed {}".format(Error.exit_code) + return True, '' + + def get_plc(self, var, value, password): + try: + self.__myConfigTool("-o", "GET", "-p", "{}".format(var), '{}'.format(value), "-w", "{}".format(password)) + except ErrorReturnCode as Error: + return False, "set var failed {} {}".format(var, Error.exit_code) + return True, '' + + def discover(self): + try: + self.__myConfigTool("-o", "DISCOVER") + except ErrorReturnCode: + return False + return True diff --git a/test-cli/test/helpers/psqldb.py b/test-cli/test/helpers/psqldb.py index 26dd03d..0141414 100644 --- a/test-cli/test/helpers/psqldb.py +++ b/test-cli/test/helpers/psqldb.py @@ -1,34 +1,40 @@ import psycopg2 +import psycopg2.extras + class PgSQLConnection(object): - """aaaaaaa""" + """Postgres Connection Object""" __conection_object = None __db_config = {'dbname': 'testsrv', 'host': '192.168.2.171', - 'password': 'Idkfa2009', 'port': 5432, 'user': 'admin'} + 'password': 'Idkfa2009', 'port': 5432, 'user': 'admin'} - def __init__ (self): -# self.__conection_object = None -# if connect_str is not None: -# self.__db_config = connect_str -# else: - self.__db_config = {'dbname': 'testsrv', 'host': '192.168.2.171', - 'password': 'Idkfa2009', 'port': 5432, 'user': 'admin'} + def __init__(self, connect_str=None): + self.__conection_object = None + if connect_str is not None: + self.__db_config = connect_str + else: + self.__db_config = {'dbname': 'testsrv', 'host': '192.168.2.171', + 'password': 'Idkfa2009', 'port': 5432, 'user': 'admin'} - def db_connect (self, connect_str): + def db_connect(self, connect_str=None): result = False try: if connect_str == None: self.__conection_object = psycopg2.connect(**self.__db_config) else: - self.__db_config = connect_str; + self.__db_config = connect_str self.__conection_object = psycopg2.connect(**self.__db_config) + self.__conection_object.autocommit = True result = True except Exception as error: print(error) return result + def getConfig(self): + return self.__db_config + def db_execute_query(self, query): cur = self.__conection_object.cursor() cur.execute(query) @@ -36,20 +42,26 @@ class PgSQLConnection(object): cur.close() return data + def db_upload_large_file(self, filepath): + # a new connection must be created to use large objects + __conn = psycopg2.connect(**self.__db_config) + lobj = __conn.lobject(oid=0, mode="rw", new_oid=0, new_file=filepath) + fileoid = lobj.oid + lobj.close() + __conn.commit() + __conn.close() + return fileoid + def commit(self): self.__conection_object.commit() - def db_close(self): if self.__conection_object is not None: self.__conection_object.close() del self.__conection_object self.__conection_object = None - def __del__(self): if self.__conection_object is not None: self.__conection_object.close() - #print("disconnected") - - + # print("disconnected") diff --git a/test-cli/test/helpers/qrreader.py b/test-cli/test/helpers/qrreader.py new file mode 100644 index 0000000..51e5247 --- /dev/null +++ b/test-cli/test/helpers/qrreader.py @@ -0,0 +1,123 @@ +import evdev +from evdev import InputDevice, categorize, ecodes +import threading +import time +import selectors +from selectors import DefaultSelector, EVENT_READ +from test.helpers.iseelogger import logObj + +selector = selectors.DefaultSelector() + +qrdevice_list = [ + "Honeywell Imaging & Mobility 1900", + "Manufacturer Barcode Reader", + "SM SM-2D PRODUCT HID KBW", + "Honeywell Imaging & Mobility 1400g" +] + +qrkey_list = {'KEY_0': '0', + 'KEY_1': '1', + 'KEY_2': '2', + 'KEY_3': '3', + 'KEY_4': '4', + 'KEY_5': '5', + 'KEY_6': '6', + 'KEY_7': '7', + 'KEY_8': '8', + 'KEY_9': '9' + } + + +class QRReader: + __qrstr = "" + __myReader = {} + __numdigits = 10 + __dev = None + + def __init__(self): + self.getQRlist() + + def getQRlist(self): + devices = [evdev.InputDevice(path) for path in evdev.list_devices()] + logObj.getlogger().debug("{}".format(devices)) + for device in devices: + logObj.getlogger().debug("{}".format(device.name)) + if device.name in qrdevice_list: + self.__myReader['NAME'] = device.name + # print(self.__myReader['NAME']) + self.__myReader['PATH'] = device.path + # print(self.__myReader['PATH']) + self.__myReader['PHYS'] = device.phys + # print(self.__myReader['PHYS']) + + def IsQR(self): + return 'NAME' in self.__myReader + + def getQRNumber(self): + return self.__qrstr + + def openQR(self): + if self.IsQR(): + self.closeQR() + self.__dev = InputDevice(self.__myReader['PATH']) + return True + return False + + def closeQR(self): + if self.__dev: + del self.__dev + self.__dev = None + + def readQR(self): + """" Sync Read up to numdigits """ + count = 0 + self.__qrstr = "" + if self.__dev: + self.__dev.grab() + for event in self.__dev.read_loop(): + if event.type == evdev.ecodes.EV_KEY: + c_event = categorize(event) + if c_event.keycode in qrkey_list: + if c_event.keystate == 0: + self.__qrstr += qrkey_list[c_event.keycode] + count += 1 + if count == self.__numdigits: + break + self.__dev.ungrab() + return True + return False + + def wait_event(self, timeout): + selector.register(self.__dev, selectors.EVENT_READ) + while True: + events = selector.select(timeout); + if not events: + return False + else: + return True + + def readQRasync(self, timeout): + count = 0 + r = False + self.__qrstr = "" + if self.__dev: + self.__dev.grab() + if self.wait_event(timeout): + for event in self.__dev.read_loop(): + if event.type == evdev.ecodes.EV_KEY: + c_event = categorize(event) + if c_event.keycode in qrkey_list: + if c_event.keystate == 0: + self.__qrstr += qrkey_list[c_event.keycode] + count += 1 + if count == self.__numdigits: + r = True + break + self.__dev.ungrab() + return r + return False + +# qr = QRReader() +# if qr.openQR(): +# print(qr.readQRasync(2)) +# qr.closeQR() diff --git a/test-cli/test/helpers/sdl.py b/test-cli/test/helpers/sdl.py new file mode 100644 index 0000000..8131a53 --- /dev/null +++ b/test-cli/test/helpers/sdl.py @@ -0,0 +1,126 @@ +import sdl2.ext +from sdl2 import * +import os +import time + +Colors = { + 'red': sdl2.ext.Color(255, 0, 0, 255), + 'green': sdl2.ext.Color(0, 255, 0, 255), + 'blue': sdl2.ext.Color(0, 0, 255, 255), + 'white': sdl2.ext.Color(255, 255, 255, 255), + 'black': sdl2.ext.Color(0, 0, 0, 255), +} + + +class SDL2(object): + __resources = None + __w = 1280 + __h = 720 + __winName = "noname" + __window = None + __factory = None + __renderer = None + __srender = None + + def __init__(self, driver, display, w, h): + os.environ['SDL_VIDEODRIVER'] = "x11" + os.environ['DISPLAY'] = ":0" + # self.__resources = sdl2.ext.Resources(parent.getAppPath(), 'files') + sdl2.ext.init() + self.__createWindow() + + def __createWindow(self): + self.__window = sdl2.ext.Window(title='{}'.format(self.__winName), size=(self.__w, self.__h), position=(0, 0), + flags=( + SDL_WINDOW_HIDDEN | SDL_WINDOW_FULLSCREEN | SDL_WINDOW_BORDERLESS | SDL_WINDOW_MAXIMIZED)) + self.__renderer = sdl2.ext.Renderer(self.__window) + self.__factory = sdl2.ext.SpriteFactory(sdl2.ext.TEXTURE, renderer=self.__renderer) + self.__srender = self.__factory.create_sprite_render_system(self.__window) + self.__window.show() + SDL_ShowCursor(SDL_DISABLE) + + # def createFont(self, fontName, fontSize, fontColor): + # return sdl2.ext.FontManager(font_path=self.__resources.get_path(fontName), size=fontSize, color=fontColor) + + def WriteText(self, text, x, y, fontManager): + imText = self.__factory.from_text(text, fontmanager=fontManager) + self.__renderer.copy(imText, dstrect=(x, y, imText.size[0], imText.size[1])) + + def show(self): + self.__window.show() + + def hide(self): + self.__window.hide() + + def setColor(self, red, green, blue, alpha): + self.__renderer.color = sdl2.ext.Color(red, green, blue, alpha) + + def fillColor(self, red, green, blue, alpha): + self.setColor(red, green, blue, alpha) + self.__renderer.clear() + + def fillbgColor(self, color, update=False): + if color in Colors: + self.__renderer.color = Colors[color] + self.__renderer.clear() + if update: + self.update() + + def update(self): + self.__renderer.present() + + # def showImage(self, filename): + # im = self.__factory.from_image(self.__resources.get_path(filename)) + # self.__srender.render(im) + + +class SDL2_Test(object): + __sdl2 = None + + def __init__(self, driver, display, w, h): + self.__sdl2 = SDL2(driver, display, w, h) + + def Clear(self): + self.__sdl2.fillbgColor('black', True) + + def Paint(self, dcolor): + self.Clear() + self.__sdl2.fillbgColor(dcolor.lower(), True) + + +class SDL2_Message(object): + __sdl2 = None + __fontManager_w = None + __fontManager_b = None + + def __init__(self): + self.__sdl2 = SDL2() + self.__fontManager_w = self.__sdl2.createFont('OpenSans-Regular.ttf', 24, Colors['white']) + self.__fontManager_b = self.__sdl2.createFont('OpenSans-Regular.ttf', 24, Colors['black']) + + def Clear(self): + self.__sdl2.fillbgColor('black', True) + + def Paint(self, dcolor): + self.Clear() + self.__sdl2.fillbgColor(dcolor.lower(), True) + + def __write_w(self, x, y, text): + self.__sdl2.WriteText(text, x, y, self.__fontManager_w) + + def __write_b(self, x, y, text): + self.__sdl2.WriteText(text, x, y, self.__fontManager_b) + + def setTestOK(self, board_uuid, qrdata): + self.Paint('GREEN') + self.__write_b(100, 50, 'TEST OK') + self.__write_b(100, 100, 'uuid={}'.format(board_uuid)) + self.__write_b(100, 150, 'qr={}'.format(qrdata)) + self.__sdl2.update() + + def setTestERROR(self, error): + self.Paint('RED') + self.__write_w(100, 50, 'TEST FAILED') + self.__write_w(100, 100, '{}'.format(error)) + self.__sdl2.update() + diff --git a/test-cli/test/helpers/setup_xml.py b/test-cli/test/helpers/setup_xml.py index 3fd9fd5..d61d1fb 100644 --- a/test-cli/test/helpers/setup_xml.py +++ b/test-cli/test/helpers/setup_xml.py @@ -1,22 +1,22 @@ import xml.etree.ElementTree as XMLParser class XMLSetup (object): - """aaaaa""" + """XML Setup Parser""" __tree = None # Parser __dbType = None # database connection required: PgSQLConnection __dbConnectionRaw = None # Connection string in raw __dbConnectionStr = None # Connection string to use in sql object connection def __init__(self, filename): - """aaaaa""" + """Parse the file in the constructor""" self.__tree = XMLParser.parse(filename) def __del__(self): - """aaaaa""" + """Destructor do nothing""" pass - def getdbConnectionStr (self): - """aaaaa""" + def getdbConnectionStr(self): + """XML to database connection string""" if self.__dbConnectionRaw is not None: return self.__dbConnectionRaw @@ -29,12 +29,44 @@ class XMLSetup (object): return None - def getPostgresConnectionStr (self): - """aaaaa""" + def getPostgresConnectionStr(self): + """Get Connection string """ str = self.__dbConnectionRaw del str['type'] return str - def getMysqlConnectionStr (self): + def getBoard(self, key, default): + for element in self.__tree.iter('board'): + if key in element.attrib: + return element.attrib[key] + return default + + def getTempPath(self, key, default): + for element in self.__tree.iter('tmpPath'): + if key in element.attrib: + return element.attrib[key] + return default + + def getKeyVal(self, tag, key, default): + for element in self.__tree.iter(tag): + if key in element.attrib: + return element.attrib[key] + return default + + def gettagKey (self, xmltag, xmlkey): """aaaaa""" - pass
\ No newline at end of file + for element in self.__tree.iter(xmltag): + return element.attrib[xmlkey] + + return None + + def getUSBlist(self): + list = {} + count = 0 + for elements in self.__tree.iter("usbdev"): + sublist = {} + for key,val in elements.items(): + sublist[key] = val + list[count] = sublist + count += 1 + return list
\ No newline at end of file diff --git a/test-cli/test/helpers/syscmd.py b/test-cli/test/helpers/syscmd.py index b579e39..7223774 100644 --- a/test-cli/test/helpers/syscmd.py +++ b/test-cli/test/helpers/syscmd.py @@ -1,6 +1,6 @@ import unittest import subprocess -from test.helpers.globalVariables import globalVar +import threading class TestSysCommand(unittest.TestCase): @@ -10,7 +10,7 @@ class TestSysCommand(unittest.TestCase): __outdata = None __outtofile = False - def __init__(self, testname, testfunc, str_cmd, outtofile = False): + def __init__(self, testname, testfunc, str_cmd, outtofile=False): """ init """ super(TestSysCommand, self).__init__(testfunc) self.__str_cmd = str_cmd @@ -42,7 +42,7 @@ class TestSysCommand(unittest.TestCase): else: res = -3 outdata = completed.stdout - self.longMessage=str(outdata).replace("'","") + self.longMessage = str(outdata).replace("'", "") self.assertTrue(True) except subprocess.CalledProcessError as err: self.assertTrue(False) @@ -54,11 +54,14 @@ class TestSysCommand(unittest.TestCase): def remove_file(self): pass + class SysCommand(object): __str_cmd = None __cmdname = None __outdata = None __errdata = None + __returnCode = None + __exception = None def __init__(self, cmdname, str_cmd): """ init """ @@ -66,31 +69,37 @@ class SysCommand(object): self.__cmdname = cmdname def getName(self): - return self.__testname + return self.__cmdname + + def setName(self, cmdName): + self.__cmdname = cmdName + + def getParams(self): + return self.__str_cmd + + def setParam(self, params): + self.__str_cmd = params def execute(self): - res = -1 + # try: + self.__outdata = None + self.__errdata = None + self.__exception = None try: - self.__outdata = None - self.__errdata = None completed = subprocess.run( self.__str_cmd, - check=True, + check=False, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE ) + self.__returnCode = completed.returncode self.__outdata = completed.stdout - if completed.returncode is 0: - res = 0 - if completed.stderr.decode('ascii') != "": - res = -1 - self.__errdata = completed.stderr + self.__errdata = completed.stderr.decode('ascii') except subprocess.CalledProcessError as err: - res = -2 - except Exception as t: - res = -3 - return res + self.__exception = err + self.__returnCode = -1 + return self.__returnCode def getOutput(self): return self.__outdata @@ -98,6 +107,12 @@ class SysCommand(object): def getOutErr(self): return self.__errdata + def getException(self): + return self.__exception + + def IsException(self): + return self.__exception is not None + def getOutputlines(self): return self.__outdata.splitlines() @@ -106,3 +121,26 @@ class SysCommand(object): f.write(self.__outdata) f.close() + +class AsyncSys(threading.Thread): + sysObject = None + sysres = None + + def __init__(self, threadID, name, counter): + threading.Thread.__init__(self) + self.threadID = threadID + self.name = name + self.counter = counter + self.sysObject = SysCommand(name, None) + + def setParams(self, Params): + self.sysObject.setParam(Params) + + def getRes(self): + return self.sysres + + def getData(self): + return self.sysObject.getOutput() + + def run(self): + self.sysres = self.sysObject.execute() diff --git a/test-cli/test/helpers/testsrv_db.py b/test-cli/test/helpers/testsrv_db.py index 94181f9..8e96e8c 100644 --- a/test-cli/test/helpers/testsrv_db.py +++ b/test-cli/test/helpers/testsrv_db.py @@ -1,10 +1,10 @@ from test.helpers.psqldb import PgSQLConnection -from test.helpers.setup_xml import XMLSetup -def find_between( s, first, last ): + +def find_between(s, first, last): try: - start = s.index( first ) + len( first ) - end = s.index( last, start ) + start = s.index(first) + len(first) + end = s.index(last, start) return s[start:end] except ValueError: return "" @@ -19,158 +19,242 @@ class TestSrv_Database(object): def __init__(self): pass - def open (self, filename): - '''Open database connection''' - self.__xml_setup = XMLSetup(filename) + def open(self, xmlObj): + self.__xml_setup = xmlObj self.__sqlObject = PgSQLConnection() return self.__sqlObject.db_connect(self.__xml_setup.getdbConnectionStr()) - def create_board(self, processor_id, model_id, variant, bmac = None): + def getConfig(self): + return self.__sqlObject.getConfig() + + def create_board(self, processor_id, model_id, variant, station, bmac=None): '''create a new board''' if bmac is None: - sql = "SELECT isee.create_board('{}', '{}', '{}', NULL);".format(processor_id, model_id, variant) + sql = "SELECT * FROM isee.f_create_board('{}', '{}', '{}', NULL, '{}');".format(processor_id, model_id, + variant, + station) else: - sql = "SELECT isee.create_board('{}', '{}', '{}', '{}');".format(processor_id, model_id, variant, bmac) - #print('>>>' + sql) + sql = "SELECT * FROM isee.f_create_board('{}', '{}', '{}', '{}', '{}');".format(processor_id, model_id, + variant, + bmac, station) + # print('>>>' + sql) try: res = self.__sqlObject.db_execute_query(sql) - #print(res) - return res[0][0]; + # print(res) + return res[0][0] except Exception as err: r = find_between(str(err), '#', '#') - #print(r) + print(r) + print(str(err)) return None - def create_model(self, modid, variant, descr, tgid): - '''create new model''' - sql = "SELECT isee.create_model('{}', '{}', '{}', '{}')".format(modid, variant, descr, tgid) - #print('>>>' + sql) + def get_tests_list(self, board_uuid): + '''get the board test list''' + sql = "SELECT * FROM isee.f_get_tests_list('{}')".format(board_uuid) + # print('>>>' + sql) try: res = self.__sqlObject.db_execute_query(sql) - #print(res) - return res[0][0]; + # print(res) + return res except Exception as err: r = find_between(str(err), '#', '#') - #print(r) + # print(r) return None - def create_test_definition(self, testname, testdesc, testfunc): - '''Create a new definition and return definition id on fail (testname already exist) return -1''' - sql = "SELECT isee.define_test('{}', '{}', '{}')".format(testname, testdesc, testfunc) - #print('>>>' + sql) + def get_test_params_list(self, testid): + sql = "SELECT * FROM isee.f_get_test_params_list({})".format(testid) + # print('>>>' + sql) try: res = self.__sqlObject.db_execute_query(sql) - #print(res) - return res[0][0]; + # print(res) + return res except Exception as err: r = find_between(str(err), '#', '#') - #print(r) + # print(r) return None - def add_testdef_to_group(self, testgroupid, testname, testparam): - '''Assign definition to group test return true on success or false if it fails''' - sql = "SELECT isee.add_test_to_group('{}', '{}', '{}')".format(testgroupid, testname, testparam) - #print('>>>' + sql) + def open_test(self, board_uuid): + sql = "SELECT * FROM isee.f_open_test('{}')".format(board_uuid) + # print('>>>' + sql) try: res = self.__sqlObject.db_execute_query(sql) - #print(res) - return res[0][0]; + # print(res) + return res[0][0] except Exception as err: r = find_between(str(err), '#', '#') - #print(r) + # print(r) return None - def getboard_test_list(self, board_uuid): - '''get the board test list''' - sql = "SELECT isee.gettestlist('{}')".format(board_uuid) - #print('>>>' + sql) + def run_test(self, testid_ctl, testid): + sql = "SELECT isee.f_run_test({},{})".format(testid_ctl, testid) + # print('>>>' + sql) + try: + self.__sqlObject.db_execute_query(sql) + except Exception as err: + r = find_between(str(err), '#', '#') + # print(r) + return None + + def finish_test(self, testid_ctl, testid, newstatus, textresult): + sql = "SELECT isee.f_finish_test({},{},'{}','{}')".format(testid_ctl, testid, newstatus, textresult) + # print('>>>' + sql) + try: + print('finish_test => SQL {}'.format(sql)) + self.__sqlObject.db_execute_query(sql) + return True + except Exception as err: + r = find_between(str(err), '#', '#') + print('finish_test => {}'.format(err)) + return False + + def upload_result_file(self, testid_ctl, testid, desc, filepath, mimetype): + try: + # generate a new oid + fileoid = self.__sqlObject.db_upload_large_file(filepath) + # insert into a table + sql = "SELECT isee.f_upload_result_file({},{},'{}','{}','{}')".format(testid_ctl, testid, desc, fileoid, + mimetype) + # print('>>>' + sql) + self.__sqlObject.db_execute_query(sql) + except Exception as err: + r = find_between(str(err), '#', '#') + # print(r) + return None + + def get_task_variables_list(self, board_uuid): + sql = "SELECT * FROM isee.f_get_task_variables_list('{}')".format(board_uuid) + # print('>>>' + sql) try: res = self.__sqlObject.db_execute_query(sql) - #print(res) - return res; + # print(res) + return res except Exception as err: r = find_between(str(err), '#', '#') - #print(r) + # print(r) return None - def getboard_comp_test_list(self, board_uuid): - '''get the board test list''' - sql = "SELECT isee.gettestcompletelist('{}')".format(board_uuid) - #print('>>>' + sql) + + def get_plc_macaddr(self, board_uuid): + sql = "SELECT * FROM isee.f_get_plcmac(\'{}\')".format(board_uuid) + # print('>>>' + sql) + try: + res = self.__sqlObject.db_execute_query(sql) + return res, '' + except Exception as err: + r = find_between(str(err), '#', '#') + print(r) + return None, r + + + def get_board_macaddr(self, board_uuid): + sql = "SELECT * FROM isee.f_get_board_macaddr('{}')".format(board_uuid) + # print('>>>' + sql) try: res = self.__sqlObject.db_execute_query(sql) - #print(res) - return res; + # print(res) + return res[0][0] except Exception as err: r = find_between(str(err), '#', '#') - #print(r) + # print(r) return None - def open_testbatch(self, board_uuid): - '''get the board test list''' - sql = "SELECT isee.open_testbatch('{}')".format(board_uuid) - #print('>>>' + sql) + def open_task(self, uuid): + sql = "SELECT * FROM isee.f_open_task('{}')".format(uuid) + # print('>>>' + sql) try: - res = str(self.__sqlObject.db_execute_query(sql)[0]) - res = res.replace('(', '') - res = res.replace(')', '') - res = res.replace(',', '') - #print(res) - return res; + res = self.__sqlObject.db_execute_query(sql) + # print(res) + return res[0][0] except Exception as err: r = find_between(str(err), '#', '#') - #print(r) + # print(r) return None - def add_test_to_batch(self, board_uuid, testid, testid_ctl, result, groupid, data): - '''get the board test list''' - sql = "SELECT isee.add_test_to_batch_c('{}','{}','{}','{}','{}','{}')".format(board_uuid, testid, testid_ctl, result, groupid, data) - #print('>>>' + sql) + def create_task_result(self, taskid_ctl, name, newstatus, newinfo=None): + sql = "SELECT isee.f_create_task_result({},'{}','{}','{}')".format(taskid_ctl, name, newstatus, newinfo) + # print('>>>' + sql) + try: + self.__sqlObject.db_execute_query(sql) + except Exception as err: + r = find_between(str(err), '#', '#') + # print(r) + return None + + def update_taskctl_status(self, taskid_ctl, newstatus): + sql = "SELECT isee.f_update_taskctl_status({},'{}')".format(taskid_ctl, newstatus) + # print('>>>' + sql) + try: + self.__sqlObject.db_execute_query(sql) + except Exception as err: + r = find_between(str(err), '#', '#') + # print(r) + return None + + def set_factorycode(self, uuid, factorycode): + sql = "SELECT isee.f_set_factorycode('{}','{}')".format(uuid, factorycode) + # print('>>>' + sql) + try: + self.__sqlObject.db_execute_query(sql) + except Exception as err: + r = find_between(str(err), '#', '#') + # print(r) + return None + + def read_station_state(self, station): + sql = "SELECT * FROM station.getmystate('{}');".format(station) + # print('>>>' + sql) try: res = self.__sqlObject.db_execute_query(sql) - #print(res) - return res; + # print(res) + return res[0][0] except Exception as err: r = find_between(str(err), '#', '#') - #print(r) + # print(r) return None - def close_testbatch(self, board_uuid, testid_ctl): - '''get the board test list''' - sql = "SELECT isee.close_testbatch('{}','{}')".format(board_uuid, testid_ctl) - #print('>>>' + sql) + def change_station_state(self, station, newstate): + sql = "SELECT station.setmystate('{}', '{}', NULL)".format(newstate, station) + print('>>>' + sql) try: res = self.__sqlObject.db_execute_query(sql) - #print(res) - return res; + print(res) + return res[0][0] except Exception as err: r = find_between(str(err), '#', '#') - #print(r) + print(r) return None - def update_set_test_row(self, nstation, testid_ctl, board_uuid, ctest, cstatus): - '''get the board test list''' - sql = "SELECT isee.update_set_test_row('{}','{}','{}','{}','{}')".format(nstation, testid_ctl ,board_uuid, ctest, cstatus) - #print('>>>' + sql) + def get_setup_variable(self, skey): + sql = "SELECT * FROM admin.get_setupvar('{}')".format(skey) + # print('>>>' + sql) try: res = self.__sqlObject.db_execute_query(sql) - #print(res) - return res; + # print(res) + return res[0][0] except Exception as err: r = find_between(str(err), '#', '#') - #print(r) + # print(r) return None - def getboard_eeprom(self, board_uuid): - '''get the board eeprom struct ''' - sql = "SELECT isee.getboard_eeprom('{}')".format(board_uuid) - #print('>>>' + sql) + def get_setup_variable(self, skey , default): + sql = "SELECT * FROM admin.get_setupvar('{}')".format(skey) + # print('>>>' + sql) + try: + res = self.__sqlObject.db_execute_query(sql) + # print(res) + return res[0][0] + except Exception as err: + r = find_between(str(err), '#', '#') + # print(r) + return default + + def setDevelStationState(self, station, newState): + sql = "UPDATE station.station_state SET state = {} WHERE hostname={}".format(newState, station) try: res = self.__sqlObject.db_execute_query(sql) - #print(res) - return res; + # print(res) + return res[0][0] except Exception as err: r = find_between(str(err), '#', '#') - #print(r) + # print(r) return None -
\ No newline at end of file diff --git a/test-cli/test/helpers/usb.py b/test-cli/test/helpers/usb.py new file mode 100644 index 0000000..8f8848d --- /dev/null +++ b/test-cli/test/helpers/usb.py @@ -0,0 +1,270 @@ +from test.helpers.syscmd import SysCommand +from test.helpers.amper import Amper +import scanf +import os +import json + + +class USBDevices(object): + __dev_list = {} + __serial_converter = {} + __usb_disk = {} + __usb_amper = {} + __dev_list_filtered = {} + __usb_mstorage = {} + __xml_config = None + + def __init__(self, xml): + self.__xml_config = xml + self.__getusblist() + self.__getSerialDevices() + self.__getAmper() + self.__getMassStorage() + + def print_usb(self): + print("------ usb list -------") + print(json.dumps(self.__dev_list)) + print("------ serial converter list -------") + print(self.__serial_converter) + print("------ usb disk list -------") + print(self.__usb_disk) + print("------ usb amper list -------") + print(self.__usb_amper) + print("------ usb mass storage list -------") + print(self.__usb_mstorage) + print("------ filtered list -------") + print(json.dumps(self.__dev_list_filtered)) + + def refresh(self): + self.__dev_list = {} + self.__serial_converter = {} + self.__usb_disk = {} + self.__usb_amper = {} + self.__usb_mstorage = {} + self.__dev_list_filtered = {} + self.__getusblist() + self.__getSerialDevices() + self.__getAmper() + self.__getMassStorage() + + def getMassStorage(self): + return self.__usb_mstorage + + def getAmper(self): + return self.__usb_amper + + def __getAmper(self): + for c, s_items in self.__serial_converter.items(): + for k,v in s_items.items(): + if k.find("USB") != -1 and v == "Prolific_Technology_Inc._USB-Serial_Controller": + testAmper = Amper(port=k) + testAmper.open() + res = testAmper.hello() + if res: + self.__usb_amper = {'port': k, 'ver': testAmper.getVersion()} + testAmper.close() + if res: + return + + def __getMassStorage(self): + for c, s_items in self.__usb_disk.items(): + for k,v in s_items.items(): + if k == "udev": + if v.get('ID_MODEL', None) == "File-Stor_Gadget" or v.get('ID_FS_LABEL', None) != "NODE_L031K6": + self.__usb_mstorage = {'disk': v.get('DEVNAME', ''), 'id': v.get('ID_FS_LABEL_ENC', '')} + + + def __check_ignore_usb_list(self, usb_list, dev_list): + count = 0 + ignore = "0" + for _, l in usb_list.items(): + for k, v in l.items(): + if k == "ignore": + ignore = v + else: + value = dev_list.get(k, None) + if v == value: + count += 1 + else: + count = 0 + break + if count >= len(l) -1: + break + + if count > 0: + if ignore == "1": + return True + return False + + + def __create_ignore_list(self, usb_list): + count = 1 + for _, dev in self.__dev_list.items(): + if not self.__check_ignore_usb_list(usb_list, dev): + self.__dev_list_filtered[count] = dev + count += 1 + + def __getusblist(self): + count = 0 + res = open('/sys/kernel/debug/usb/devices', 'rb').read().decode() + list = {} + for i in res.split('\n'): + if i.find('T:') != -1: + if len(list) > 0: + count += 1 + self.__dev_list[count] = list + list = self.__parse_T(i) + elif i.find('D:') != -1: + list = {**list, **self.__parse_D(i)} + elif i.find('P:') != -1: + list = {**list, **self.__parse_P(i)} + elif i.find('S:') != -1: + list = {**list, **self.__parse_S(i)} + else: + """ Ignore all rest""" + pass + + if count > 0: + count += 1 + self.__dev_list[count] = list + + self.__create_ignore_list(self.__xml_config.getUSBlist()) + + + def __parse_T(self, str): + data = {} + res = scanf.scanf("T: Bus=%2c Lev=%2c Prnt=%2c Port=%2c Cnt=%2c Dev#=%3c Spd=%s MxCh=%2c", str) + data["BUS"] = res[0] + data["LEV"] = res[1] + data["PRNT"] = res[2] + data["PORT"] = res[3] + data["CNT"] = res[4] + data["DEV"] = res[5].lstrip() + data["SPEED"] = res[6].rstrip() + data["MXCH"] = res[7].lstrip() + return data + + def __parse_D(self, str): + data = {} + str += ' ' + res = scanf.scanf("D: Ver=%5c Cls=%2c(%s ) %s ", str) + data["VER"] = res[0].lstrip() + data["CLASS"] = res[1] + return data + + def __parse_P(self, str): + data = {} + str += ' ' + res = scanf.scanf("P: Vendor=%4c ProdID=%4c %s", str) + data["VENDOR"] = res[0].lstrip() + data["PRODID"] = res[1] + return data + + def __parse_S(self, str): + data = {} + if str.find('Manufacturer') != -1: + pos = str.find('Manufacturer=') + data["MANUFACTURER"] = str[pos + len('Manufacturer='):].rstrip() + elif str.find('Product') != -1: + pos = str.find('Product=') + data["PRODUCT"] = str[pos + len('Product='):].rstrip() + elif str.find('SerialNumber') != -1: + pos = str.find('SerialNumber=') + data["SERIAL"] = str[pos + len('SerialNumber='):].rstrip() + else: + pass + return data + + def finddevicebytag(self, tag, str): + t = {} + count = 0 + for i, k in self.__dev_list.items(): + if tag in k: + if k[tag].find(str) != -1: + t[count] = k + count += 1 + return t + + def __getSerialDevices(self): + self.__serial_converter = {} + self.__usb_disk = {} + self.__usb_amper = {} + test_myPath = os.path.dirname(os.path.abspath(__file__)) + test_myPath = test_myPath + "/../" + s = SysCommand('usb', '{}/scripts/usb.sh'.format(test_myPath)) + s.execute() + data_list = s.getOutput().decode() + for i in data_list.split('\n'): + if len(i) > 0: + pos_div = i.find('-') + dev = i[0:pos_div - 1] + devName = i[pos_div + 2:] + # print("dev: {}".format(dev)) + # print("devName: {}".format(devName)) + res = scanf.scanf("/dev/sd%c", dev) + if res is not None: + dev_id = res[0][0] + # Now verify if is Disk or Partition + res = scanf.scanf("/dev/sd%c%c", dev) + if res is not None: + # this is a partition and we can ignore it + pass + else: + self.__usb_disk[dev_id] = {dev: devName, 'udev': self.__getudevquery(dev)} + else: + res = scanf.scanf("/dev/ttyUSB%c", dev) + if res is not None: + serial_id = res[0][0] + self.__serial_converter[serial_id] = {dev: devName, 'udev': self.__getudevquery(dev)} + + def __getudevquery(self, device): + list = {} + s = SysCommand('query-udev', 'udevadm info --query=all -n {}'.format(device)) + res = s.execute() + if res != 0: + return None + for i in s.getOutput().decode().split('\n'): + if i.find('P:') != -1: + list = self.__parse_udev_P(i) + elif i.find('S:') != -1: + list = {**list, **self.__parse_udev_S(i)} + elif i.find('N:') != -1: + list = {**list, **self.__parse_udev_N(i)} + elif i.find('S:') != -1: + list = {**list, **self.__parse_udev_S(i)} + elif i.find('E:') != -1: + list = {**list, **self.__parse_udev_E(i)} + return list + + def __parse_udev_P(self, str): + pos = str.find('P: ') + str = str[len('P: ') + pos:] + return {'PATH': str} + + def __parse_udev_S(self, str): + pos = str.find('P: ') + str = str[len('P: ') + pos:] + pos = str.find('disk/by-id/') + if pos != -1: + return {'DISK_BY_ID': str[len('disk/by-id/') + pos:]} + pos = str.find('disk/by-path/') + if pos != -1: + return {'DISK_BY_PATH': str[len('disk/by-path/') + pos:]} + pos = str.find('disk/by-uuid/') + if pos != -1: + return {'DISK_BY_UUID': str[len('disk/by-uuid/') + pos:]} + else: + return {} + + def __parse_udev_N(self, str): + pos = str.find('N: ') + str = str[len('N: ') + pos:] + return {'DEV_NAME': str} + + def __parse_udev_E(self, str): + pos = str.find('E: ') + str = str[len('E: ') + pos:] + pos = str.find('=') + str_key = str[:pos] + str_value = str[pos + 1:] + return {str_key: str_value} diff --git a/test-cli/test/helpers/utils.py b/test-cli/test/helpers/utils.py new file mode 100644 index 0000000..8d2c27b --- /dev/null +++ b/test-cli/test/helpers/utils.py @@ -0,0 +1,68 @@ +import json +import socket +import os +import scanf + +def save_json_to_disk(filePath, description, mime, json_data, result): + with open(filePath, 'w') as outfile: + json.dump(json_data, outfile, indent=4) + outfile.close() + result.append( + { + "description": description, + "filepath": filePath, + "mimetype": mime + } + ) + +def save_file_to_disk(filePath, description, mime, data, result): + with open(filePath, 'w') as outfile: + outfile.write(data) + outfile.close() + result.append( + { + "description": description, + "filepath": filePath, + "mimetype": mime + } + ) + +def save_result (filePath, description, mime, result): + result.append( + { + "description": description, + "filepath": filePath, + "mimetype": mime + } + ) + + +def str2bool(v): + return v.lower() in ("yes", "true", "t", "1") + +def station2Port(base): + Name = socket.gethostname() + res = scanf.scanf("Station%d", Name) + if res[0]: + NmBase = int(base) + res = int(res[0]) + return str(NmBase + res) + return base + + +def sys_read(sysnode): + try: + f = open(sysnode, "r") + data = f.readline() + f.close() + except IOError as Error: + return False, '{}'.format(Error.errno) + return True, data + +def removefileIfExist(fname): + if os.path.exists(fname): + try: + os.remove(fname) + except OSError as error: + return False, str(error) + return True, '' diff --git a/test-cli/test/runners/simple.py b/test-cli/test/runners/simple.py index 5b2da5a..c110b1c 100644 --- a/test-cli/test/runners/simple.py +++ b/test-cli/test/runners/simple.py @@ -1,5 +1,3 @@ -#!/usr/bin/env python - """ Simple Test Runner for unittest module @@ -7,10 +5,7 @@ Simple Test Runner for unittest module import sys import unittest -import os -from test.helpers.globalVariables import globalVar -from test.helpers.testsrv_db import TestSrv_Database - +from test.helpers.iseelogger import logObj class SimpleTestRunner: @@ -22,77 +17,78 @@ class SimpleTestRunner: --------------------------------------------- """ - def __init__(self, stream=sys.stderr, verbosity=0): + __pgObj = None + + def __init__(self, pgObj, stream=sys.stderr, verbosity=0): self.stream = stream self.verbosity = verbosity + self.__pgObj = pgObj def writeUpdate(self, message): self.stream.write(message) def run(self, test): - """ Run the given test case or Test Suite. - - """ - result = TextTestResult(self) + """ Run the given test case or Test Suite. """ + result = TextTestResult(self, self.__pgObj) test(result) - result.testsRun - # self.writeUpdate("---------------------------------------------\n") return result + class TextTestResult(unittest.TestResult): # Print in terminal with colors PASS = '\033[32mPASS\033[0m\n' FAIL = '\033[31mFAIL\033[0m\n' ERROR = '\033[31mERROR\033[0m\n' - def __init__(self, runner): + __pgObj = None + + def __init__(self, runner, pgObj): unittest.TestResult.__init__(self) self.runner = runner self.result = self.ERROR + self.__pgObj = pgObj def startTest(self, test): unittest.TestResult.startTest(self, test) self.runner.writeUpdate("%s : " % test.shortDescription()) # SEND TO DB THE UPDATE THAT WE RUN EACH TEST - psdb = TestSrv_Database() - psdb.open("setup.xml") - psdb.update_set_test_row(globalVar.station, globalVar.testid_ctl, globalVar.g_uuid, test.shortDescription(), "RUNNING") + self.__pgObj.run_test(test.params["testidctl"], test.params["testid"]) def addSuccess(self, test): unittest.TestResult.addSuccess(self, test) - self.result=self.PASS + self.result = self.PASS def addError(self, test, err): unittest.TestResult.addError(self, test, err) test.longMessage = err[1] self.result = self.ERROR + print(err) def addFailure(self, test, err): unittest.TestResult.addFailure(self, test, err) - test.longMessage=err[1] + test.longMessage = err[1] self.result = self.FAIL + logObj.getlogger().info('{}:{}'.format(test, err)) def stopTest(self, test): - unittest.TestResult.stopTest(self, test) - # display: print test result - self.runner.writeUpdate(self.result) - # DB: PREPARE THE DATA TO BE INSERTED - dbdata = {} - dbdata['uuid'] = globalVar.g_uuid - dbdata['name'] = test.shortDescription() - dbdata['result'] = self.result - dbdata['msg'] = str(test.longMessage) - #DB: INSERT IN THE DATABASE - filename='test_results.dat' - testResult = open(filename, 'a') - testResult.write(str(dbdata)) - testResult.write("\n") - testResult.close() - #CONVERT FANCY FAIL AND PASS - if self.result==self.PASS: simple_result="TRUE" - if self.result == self.FAIL: simple_result = "FALSE" - elif self.result == self.ERROR: simple_result = "FALSE" - #SEND TO DB THE RESULT OF THE TEST - psdb = TestSrv_Database() - psdb.open("setup.xml") - psdb.add_test_to_batch(globalVar.g_uuid, test.shortDescription(), globalVar.testid_ctl, simple_result, globalVar.g_mid, test.longMessage) + try: + unittest.TestResult.stopTest(self, test) + # display: print test result + self.runner.writeUpdate(self.result) + # save result data + for result in test.getresults(): + self.__pgObj.upload_result_file(test.params["testidctl"], test.params["testid"], + result["description"], result["filepath"], result["mimetype"]) + # SEND TO DB THE RESULT OF THE TEST + if self.result == self.PASS: + status = "TEST_COMPLETE" + resulttext = test.gettextresult() + logObj.getlogger().info('{}:{}:{}:{}'.format(test.params["testidctl"], test.params["testid"], status, resulttext)) + else: + status = "TEST_FAILED" + resulttext = test.longMessage + logObj.getlogger().info('{}:{}:{}:{}'.format(test.params["testidctl"], test.params["testid"], status, resulttext)) + if not self.__pgObj.finish_test(test.params["testidctl"], test.params["testid"], status, resulttext): + self.__pgObj.finish_test(test.params["testidctl"], test.params["testid"], status, "") + except Exception as E: + logObj.getlogger().error('Exception: [{}]{}:{}:{}:{}'.format(E, test.params["testidctl"], test.params["testid"], status, resulttext))
\ No newline at end of file diff --git a/test-cli/test/scripts/__init__.py b/test-cli/test/scripts/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/test-cli/test/scripts/__init__.py diff --git a/test-cli/test/scripts/usb.sh b/test-cli/test/scripts/usb.sh new file mode 100755 index 0000000..52ea4b2 --- /dev/null +++ b/test-cli/test/scripts/usb.sh @@ -0,0 +1,14 @@ +#!/bin/bash + +for sysdevpath in $(find /sys/bus/usb/devices/usb*/ -name dev); do + ( + syspath="${sysdevpath%/dev}" + devname="$(udevadm info -q name -p $syspath)" + [[ "$devname" == "bus/"* ]] && continue + eval "$(udevadm info -q property --export -p $syspath)" + [[ -z "$ID_SERIAL" ]] && continue + echo "/dev/$devname - $ID_SERIAL" + ) +done + +exit 0 diff --git a/test-cli/test/tasks/__init__.py b/test-cli/test/tasks/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/test-cli/test/tasks/__init__.py diff --git a/test-cli/test/tasks/flasheeprom.py b/test-cli/test/tasks/flasheeprom.py new file mode 100644 index 0000000..775c556 --- /dev/null +++ b/test-cli/test/tasks/flasheeprom.py @@ -0,0 +1,77 @@ +import os +import binascii +from test.helpers.globalVariables import globalVar + +class EEprom_Flasher: + __Name = None + __varlist = None + __xmlObj = None + __lastError = '' + + def __init__(self, varlist): + self.__varlist = varlist + self.__xmlObj = varlist['xml'] + self.__Name = varlist.get('name_eeprom', self.__xmlObj.getKeyVal('EEProm_Flasher', 'name', 'EEProm_Flasher')) + # self.__igepflashPath = self.__xmlObj.getKeyVal('NAND_Flasher', 'igep_flash', '/usr/bin/igep-flash') + # self.__ImagePath = varlist.get('flashimagepath', '') + # self.__SkipNandtest = varlist.get('skipnandtest', self.__xmlObj.getKeyVal('NAND_Flasher', 'skipnandtest', '1')) + + def getTaskName(self): + return self.__Name + + def getError(self): + return self.__lastError + + def Execute(self): + return True, None + + def __generate_data_bytes(self, boarduuid, mac0, mac1): + data = bytearray() + data += (2029785358).to_bytes(4, 'big') # magicid --> 0x78FC110E + data += bytearray([0, 0, 0, 0]) # crc32 = 0 + data += bytearray(boarduuid + "\0", + 'ascii') # uuid --> 'c0846c8a-5fa5-11ea-8576-f8b156ac62d7' and \0 at the end + data += binascii.unhexlify(mac0.replace(':', '')) # mac0 --> 'f8:b1:56:ac:62:d7' + if mac1 is not None: + data += binascii.unhexlify(mac1.replace(':', '')) # mac1 --> 'f8:b1:56:ac:62:d7' + else: + data += bytearray([0, 0, 0, 0, 0, 0]) # mac1 --> 0:0:0:0:0:0 + # calculate crc + crc = (binascii.crc32(data, 0)).to_bytes(4, 'big') + data[4:8] = crc + return data + + + def __generate_output_data(self, data_rx): + boarduuid = data_rx[8:44].decode('ascii') # no 8:45 porque el \0 final hace que no funcione postgresql + mac0 = binascii.hexlify(data_rx[45:51]).decode('ascii') + mac1 = binascii.hexlify(data_rx[51:57]).decode('ascii') + smac0 = ':'.join(mac0[i:i + 2] for i in range(0, len(mac0), 2)) + smac1 = ':'.join(mac1[i:i + 2] for i in range(0, len(mac1), 2)) + eepromdata = "UUID " + boarduuid + " , MAC0 " + smac0 + " , MAC1 " + smac1 + return eepromdata + + + # returns exitcode and data saved into eeprom + def flash_eeprom(self, eeprompath, boarduuid, mac0, mac1=None): + print("Start programming Eeprom...") + # check if eeprompath is correct + if os.path.isfile(eeprompath): + # create u-boot data struct + data = self.__generate_data_bytes(boarduuid, mac0, mac1) + # write into eeprom and read back + f = open(eeprompath, "r+b") + f.write(data) + f.seek(0) + data_rx = f.read(57) + for i in range(57): + if data_rx[i] != data[i]: + print("Error while programming eeprom memory.") + return 1, None + print("Eeprom programmed succesfully.") + # generated eeprom read data + eepromdata = self.__generate_output_data(data_rx) + return 0, eepromdata + else: + print("Eeprom memory not found.") + return 1, None diff --git a/test-cli/test/tasks/flashmemory.py b/test-cli/test/tasks/flashmemory.py new file mode 100644 index 0000000..ad9d92a --- /dev/null +++ b/test-cli/test/tasks/flashmemory.py @@ -0,0 +1,44 @@ +import sh +import os + +class NAND_Flasher: + __Name = None + __varlist = None + __xmlObj = None + __igepflashPath = None + __lastError = '' + + def __init__(self, varlist): + self.__varlist = varlist + self.__xmlObj = varlist['xml'] + self.__Name = varlist.get('name_flashnand', self.__xmlObj.getKeyVal('NAND_Flasher', 'name', 'NAND_Flasher')) + self.__igepflashPath = self.__xmlObj.getKeyVal('NAND_Flasher', 'igep_flash', '/usr/bin/igep-flash') + self.__ImagePath = varlist.get('flashimagepath', '') + self.__SkipNandtest = varlist.get('skipnandtest', self.__xmlObj.getKeyVal('NAND_Flasher', 'skipnandtest', '1')) + + def getTaskName(self): + return self.__Name + + def getError(self): + return self.__lastError + + def Execute(self): + try: + self.__lastError = '' + if not os.path.isfile(self.__ImagePath): + self.__lastError('Not Image Found: {}'.format(self.__ImagePath)) + return False, None + if self.__SkipNandtest == '1': + p = sh.bash('{}'.format(self.__igepflashPath), "--skip-nandtest", "--image", self.__ImagePath) + else: + p = sh.bash('{}'.format(self.__igepflashPath), "--image", self.__ImagePath) + if p.exit_code != 0: + return False, None + except OSError as e: + self.__lastError('Exception: {}'.format(e.strerror)) + return False, None + except Exception as Error: + self.__lastError('Exception: {}'.format(Error)) + return False, None + return True, None + diff --git a/test-cli/test/tests/qamp.py b/test-cli/test/tests/qamp.py deleted file mode 100644 index 9fcd6d2..0000000 --- a/test-cli/test/tests/qamp.py +++ /dev/null @@ -1,87 +0,0 @@ -from test.helpers.syscmd import SysCommand -import unittest -import serial -import time - -class Qamp(unittest.TestCase): - - def __init__(self, testname, testfunc, undercurrent=0.1, overcurrent=2): - self._current = 0.0 - self._undercurrent=undercurrent - self._overcurrent = overcurrent - self._vshuntfactor=16384.0 - self._ser = serial.Serial() - self._ser.port = '/dev/ttyUSB0' - self._ser.baudrate = 115200 - self._ser.parity = serial.PARITY_NONE - self._ser.timeout = 0 - self._ser.writeTimout = 0 - self._ser.xonxoff = False - self._ser.rtscts = False - self._ser.dsrdtr = False - super(Qamp, self).__init__(testfunc) - self._testMethodDoc = testname - - def __del__(self): - self._ser.close() - - def execute(self): - # Open Serial port ttyUSB0 - error=0 - try: - self._ser.open() - except: - self.fail("failed: IMPOSSIBLE OPEN USB-SERIAL PORT ( {} )".format(self._ser.port)) - error=1 - return -1 - if error==0: - # Clean input and output buffer of serial port - self._ser.flushInput() - self._ser.flushOutput() - # Send command to read Voltage at Shunt resistor - # Prepare cmd - cmd = ('at\n\r') - i=0 - while (i < len(cmd)): - i = i + self._ser.write(cmd[i].encode('ascii')) - time.sleep(0.05) - self._ser.read(1) - res0 = [] - while (self._ser.inWaiting() > 0): # if incoming bytes are waiting to be read from the serial input buffer - res0.append(self._ser.read(1).decode('ascii')) # read the bytes and convert from binary array to ASCII - print(res0) - #CHECK COM FIRST - cmd = ('at+in?\n\r') - i = 0 - - # Write, 1 by 1 byte at a time to avoid hanging of serial receiver code of listener, emphasis being made at sleep of 50 ms. - while (i < len(cmd)): - i = i + self._ser.write(cmd[i].encode('ascii')) - time.sleep(0.05) - self._ser.read(1) - - # Read, 1 by 1 byte - res = [] - while (self._ser.inWaiting() > 0): # if incoming bytes are waiting to be read from the serial input buffer - res.append(self._ser.read(1).decode('ascii')) # read the bytes and convert from binary array to ASCII - - string = ''.join(res) - string = string.replace('\n', '') - try: - self._current = float(int(string, 0)) / self._vshuntfactor - except: - self.fail("failed: CAN'T READ CONSUMPTION (CURRENT=0?)") - error=1 - return -1 - if error==0: - print(self._current) - # In order to give a valid result it is importarnt to define an under current value - if (self._current > float(self._overcurrent)): - self.fail("failed: OVERCURRENT DETECTED ( {} )".format(self._current)) - return -1 - - if (self._current < float(self._undercurrent)): - self.fail("failed: UNDERCURRENT DETECTED ( {} )".format(self._current)) - return -1 - - diff --git a/test-cli/test/tests/qamper.py b/test-cli/test/tests/qamper.py new file mode 100644 index 0000000..96b673c --- /dev/null +++ b/test-cli/test/tests/qamper.py @@ -0,0 +1,74 @@ +import unittest +from test.helpers.amper import Amper + + +class Qamper(unittest.TestCase): + params = None + __resultlist = None # resultlist is a python list of python dictionaries + dummytest = False + + # varlist: undercurrent, overcurrent + def __init__(self, testname, testfunc, varlist): + self.params = varlist + super(Qamper, self).__init__(testfunc) + + if "undercurrent" in varlist: + self._undercurrent = varlist["undercurrent"] + else: + raise Exception('undercurrent param inside Qamp must be defined') + if "overcurrent" in varlist: + self._overcurrent = varlist["overcurrent"] + else: + raise Exception('overcurrent param inside Qamp must be defined') + self._testMethodDoc = testname + self.__resultlist = [] + if "dummytest" in varlist: + self.dummytest = varlist["dummytest"] + + def saveresultinfile(self, currentvalue): + # save result in a file + with open('/mnt/station_ramdisk/amper.txt', 'w') as outfile: + n = outfile.write("Current: {} A".format(currentvalue)) + outfile.close() + self.__resultlist.append( + { + "description": "Amperimeter values", + "filepath": "/mnt/station_ramdisk/amper.txt", + "mimetype": "text/plain" + } + ) + + def execute(self): + if self.dummytest == '1': + self.current = "0.0" + self.saveresultinfile(self.current) + return + + amp = Amper() + # open serial connection + if not amp.open(): + self.fail("Error: can not open a serial port") + # check if the amperimeter is connected and working + # 2 ATTEMTS + if not amp.hello(): + if not amp.hello(): + self.fail("Error: can not communicate") + # get current value (in Amperes) + self.current = amp.getCurrent() + # save result in a file + self.saveresultinfile(self.current) + # close serial connection + amp.close() + # Check current range + if float(self.current) > float(self._overcurrent): + # Overcurrent detected + self.fail("failed: Overcurrent detected ( {} )".format(self.current)) + elif float(self.current) < float(self._undercurrent): + # Undercurrent detected + self.fail("failed: Undercurrent detected ( {} )".format(self.current)) + + def getresults(self): + return self.__resultlist + + def gettextresult(self): + return "{} A".format(self.current) diff --git a/test-cli/test/tests/qaudio.py b/test-cli/test/tests/qaudio.py index a1572ca..a219a1a 100644 --- a/test-cli/test/tests/qaudio.py +++ b/test-cli/test/tests/qaudio.py @@ -1,37 +1,100 @@ -from test.helpers.syscmd import SysCommand import unittest -#class name +import sh +import wave +import contextlib +import uuid +import time +from test.helpers.utils import save_result +from test.helpers.iseelogger import logObj + class Qaudio(unittest.TestCase): - # Initialize the variables + params = None + __resultlist = None # resultlist is a python list of python dictionaries + __dtmf_secuence = ["1", "3", "5", "7", "9"] + __dtmf_file = None + __file_uuid = None + __QaudioName = None - def __init__(self, testname, testfunc, dtmfFile): - # Doing this we will initialize the class and later on perform a particular method inside this class + def __init__(self, testname, testfunc, varlist): + self.params = varlist super(Qaudio, self).__init__(testfunc) self._testMethodDoc = testname - self._dtmfFile=dtmfFile - self.__sum=0 - self.__refSum = 25 # 1+3+5+7+9 + self.__resultlist = [] + self.__xmlObj = varlist["xml"] + self.__QaudioName = varlist.get('name', 'qaudio') + self.__dtmf_file_play = varlist.get('play_dtmf_file', self.__xmlObj.getKeyVal(self.__QaudioName, "play_dtmf_file", "dtmf-13579.wav")) + self.__recordtime = varlist.get('recordtime', self.__xmlObj.getKeyVal(self.__QaudioName, "recordtime", "0.15")) + self.__dtmf_file_record = varlist.get('record_dtmf_file', self.__xmlObj.getKeyVal(self.__QaudioName, "record_dtmf_file", "record-{}.wav")) + self.__sampling_rate = varlist.get('rate', self.__xmlObj.getKeyVal(self.__QaudioName, "rate", "8000")) + self.__record_path = varlist.get('to', self.__xmlObj.getKeyVal(self.__QaudioName, "to", "/mnt/station_ramdisk")) + self.__run_delay = float(varlist.get('aplay_run_delay', self.__xmlObj.getKeyVal(self.__QaudioName, "aplay_run_delay", "0.0"))) + self.__file_uuid = uuid.uuid4() + self.__dtmf_file_record = self.__dtmf_file_record.format(self.__file_uuid) + + def restoreAlsaCtl(self): + try: + sh.alsactl('restore'); + except Exception as E: + logObj.getlogger().error("Failed to Execite alsactl restore"); def execute(self): - str_cmd = "aplay test/files/dtmf-13579.wav 2> /dev/null & arecord -r 8000 -d 1 recorded.wav 2> /dev/null" #.format(self.__dtmfFile) - audio_loop = SysCommand("command-name", str_cmd) - if audio_loop.execute() == 0:# BUG: Returns -1 but work - lines = audio_loop.getOutput().splitlines() - str_cmd = "multimon -t wav -a DTMF recorded.wav -q 2> /dev/null" - dtmf_decoder = SysCommand("command-name", str_cmd) - if dtmf_decoder.execute() == 0: # BUG: Returns -1 but work - self.__raw_out = dtmf_decoder.getOutput() - if self.__raw_out == "": - return -1 - lines = dtmf_decoder.getOutput().splitlines() - for i in range(0, 5): - aux=[int(s) for s in lines[i].split() if s.isdigit()] - self.__sum=self.__sum+aux[0] - self.failUnless(self.__sum == self.__refSum), "failed: incorrect dtmf code" + str(self.__sum) + self.restoreAlsaCtl() + try: + # analize audio file + calcRecordTime = self.calc_audio_duration(self.__dtmf_file_play) + float(self.__recordtime) + float(0.1) + except TypeError as Error: + self.fail("failed: TypeError: {}.".Error) + + # previous play to estabilise audio lines + p2 = sh.arecord("-r", self.__sampling_rate, "-d", calcRecordTime, "{}/{}".format(self.__record_path, self.__dtmf_file_record), _bg=True) + time.sleep(self.__run_delay) + p1 = sh.aplay(self.__dtmf_file_play) + p2.wait() + + p2 = sh.arecord("-r", self.__sampling_rate, "-d", calcRecordTime, "{}/{}".format(self.__record_path, self.__dtmf_file_record), _bg=True) + time.sleep(self.__run_delay) + p1 = sh.aplay(self.__dtmf_file_play) + p2.wait() + if p1.exit_code == 0 and p2.exit_code == 0: + p = sh.multimon("-t", "wav", "-a", "DTMF", "{}/{}".format(self.__record_path, self.__dtmf_file_record), "-q") + if p.exit_code == 0: + lines = p.stdout.decode('ascii').splitlines() + self.__dtmf_secuence_result = [] + for li in lines: + if li.split(" ")[0] == "DTMF:": + self.__dtmf_secuence_result.append(li.split(" ")[1]) + self.__dtmf_secuence_result = sorted(list(set(self.__dtmf_secuence_result))) + # compare original and processed dtmf sequence + if len(self.__dtmf_secuence) == len(self.__dtmf_secuence_result): + for a, b in zip(self.__dtmf_secuence, self.__dtmf_secuence_result): + if a != b: + logObj.getlogger().debug( + "failed: sent and received DTMF sequence do not match.{} != {}".format( + self.__dtmf_secuence, self.__dtmf_secuence_result)) + self.fail("failed: sent and received DTMF sequence do not match.") + save_result(filePath='{}/{}'.format(self.__record_path, self.__dtmf_file_record), + description='sound record', + mime='audio/x-wav', + result=self.__resultlist) + else: + logObj.getlogger().debug("received DTMF sequence is shorter than expected.{} {}".format(self.__dtmf_secuence,self.__dtmf_secuence_result)) + self.fail("received DTMF sequence is shorter than expected") else: - self.fail("failed: fail reading recorded file") - return -1 + self.fail("failed: unable to use multimon command.") else: - self.fail("failed: fail playing/recording file") - return -1 - return 0 + self.fail("failed: unable to play/record audio.") + + def getresults(self): + return self.__resultlist + + def gettextresult(self): + return "" + + def calc_audio_duration(self, fname): + duration = 0.0 + with contextlib.closing(wave.open(fname, 'r')) as f: + frames = f.getnframes() + rate = f.getframerate() + duration = frames / float(rate) + f.close() + return duration diff --git a/test-cli/test/tests/qbutton.py b/test-cli/test/tests/qbutton.py deleted file mode 100644 index 72a897e..0000000 --- a/test-cli/test/tests/qbutton.py +++ /dev/null @@ -1,54 +0,0 @@ -from test.helpers.syscmd import SysCommand -import unittest -import uuid -import time - -class Qbutton(unittest.TestCase): - - def __init__(self, testname, testfunc, gpio): - if gpio == "SOPA": - super(Qbutton, self).__init__("buttonSopa") - else: - super(Qbutton, self).__init__("buttonGpio") - self._testMethodDoc = testname - - def buttonGpio(self): - print("normal-button-test-using-gpio") - self.fail("failed: GPIO BUTTON FAIL") - - def buttonSopa(self): - str_cmd = "i2cset -f -y 1 0x2d 0x40 0x31" - disable_pmic = SysCommand("disable_pmic", str_cmd) - disable_pmic.execute() - # BUG: REPEAT THIS EXECUTION TWICE BECAUSE FIRST TIME IT RETURNS AN ERROR - led_on="echo 1 > /sys/class/leds/red\:usbhost/brightness" - ledon = SysCommand("led_on", led_on) - ledon.execute() - time.sleep(0.1) - disable_pmic.execute() - if disable_pmic.execute() == 0: - str_cmd = "i2cset -f -y 1 0x2d 0x50 0xff" - reset_button = SysCommand("reset_button", str_cmd) - if reset_button.execute() == 0: - str_cmd = "i2cget -f -y 1 0x2d 0x50" - get_button_val = SysCommand("get_button_val", str_cmd) - print("\n\t --> PRESS button for 1 sec (TIMEOUT: 10s) \n") - timeout = 0 - while timeout < 20: - if get_button_val.execute() == 0: - get_button_val.execute() - button_value = get_button_val.getOutput() - button_value=button_value.decode('ascii').split("x") - if int(button_value[1]) == 4: - timeout = 20 - time.sleep(0.5) - timeout = timeout + 1 - if timeout==20 and int(button_value[1]) == 0: - self.fail("failed: timeout exceeded") - else: - timeout = 20 - self.fail("failed: not button input") - else: - self.fail("failed: could not complete i2c reset button state") - else: - self.fail("failed: could not complete i2c disable PMIC") diff --git a/test-cli/test/tests/qdmesg.py b/test-cli/test/tests/qdmesg.py new file mode 100644 index 0000000..39a5047 --- /dev/null +++ b/test-cli/test/tests/qdmesg.py @@ -0,0 +1,32 @@ +import sh +import os +import os.path +from os import path + + +class Qdmesg: + params = None + __resultlist = None # resultlist is a python list of python dictionaries + + def __init__(self, testname, testfunc, varlist): + self.params = varlist + self.__testMethodDoc = testname + self.__resultlist = [] + self.pgObj = varlist["db"] + self.__xmlObj = varlist["xml"] + self.__QdmesgName = varlist.get('name', 'qdmesg') + self.__syslog_dmesg_file = varlist.get('syslog_dmesg',self.__xmlObj.getKeyVal(self.__QdmesgName, "syslog_dmesg", + "/var/log/kern.log")) + + def getTestName(self): + return self.__testMethodDoc + + def execute(self): + self.pgObj.run_test(self.params["testidctl"], self.params["testid"]) + if not os.path.isfile('{}'.format(self.__syslog_dmesg_file)): + self.pgObj.finish_test(self.params["testidctl"], self.params["testid"], "TEST_FAILED", "") + return False + self.pgObj.upload_result_file(self.params["testidctl"], self.params["testid"], "{}".format(self.__syslog_dmesg_file), + "{}".format(self.__syslog_dmesg_file), "text/plain") + self.pgObj.finish_test(self.params["testidctl"], self.params["testid"], "TEST_COMPLETE", "") + return True diff --git a/test-cli/test/tests/qduplex_ser.py b/test-cli/test/tests/qduplex_ser.py index 98bda81..170039b 100644 --- a/test-cli/test/tests/qduplex_ser.py +++ b/test-cli/test/tests/qduplex_ser.py @@ -1,46 +1,79 @@ -from test.helpers.syscmd import SysCommand import unittest import uuid import serial import time + class Qduplex(unittest.TestCase): + params = None + __port1 = None + __port2 = None + __baudrate = None + __resultlist = None # resultlist is a python list of python dictionaries - def __init__(self, testname, testfunc, port1, port2, baudrate): + # varlist: port1, port2, baudrate + def __init__(self, testname, testfunc, varlist): + self.params = varlist super(Qduplex, self).__init__(testfunc) - self.__port1 = port1 - self.__serial1 = serial.Serial(self.__port1, timeout=1) - self.__serial1.baudrate = baudrate - - self.__port2 = port2 - self.__serial2 = serial.Serial(self.__port2, timeout=1) - self.__serial2.baudrate = baudrate + if "port1" in varlist: + self.__port1 = varlist["port1"] + else: + raise Exception('port1 param inside Qduplex must be defined') + if "port2" in varlist: + self.__port2 = varlist["port2"] + else: + raise Exception('port2 param inside Qduplex must be defined') + if "baudrate" in varlist: + self.__baudrate = varlist["baudrate"] + else: + raise Exception('baudrate param inside Qduplex must be defined') self._testMethodDoc = testname + self.__resultlist = [] + + # open serial connection + self.__serial1 = serial.Serial(self.__port1, self.__baudrate, timeout=1) + self.__serial2 = serial.Serial(self.__port2, self.__baudrate, timeout=1) def __del__(self): self.__serial1.close() self.__serial2.close() def execute(self): + # generate a random uuid + test_uuid = str(uuid.uuid4()).encode() + + # clean serial ports self.__serial1.flushInput() self.__serial1.flushOutput() self.__serial2.flushInput() self.__serial2.flushOutput() - test_uuid = str(uuid.uuid4()).encode() + # send the uuid through serial port self.__serial1.write(test_uuid) time.sleep(0.05) # there might be a small delay if self.__serial2.inWaiting() == 0: - self.fail("failed: PORT {} wait timeout exceded, wrong communication?".format(self.__port2)) + self.fail("failed: port {} wait timeout exceded".format(self.__port2)) else: - if (self.__serial2.readline() != test_uuid): - self.fail("failed: PORT {} write/read mismatch".format(self.__port2)) + if self.__serial2.readline() != test_uuid: + self.fail("failed: port {} write/read mismatch".format(self.__port2)) - test_uuid = str(uuid.uuid4()).encode() + time.sleep(0.05) # there might be a small delay + + # clean serial ports + self.__serial1.flushInput() + self.__serial1.flushOutput() + self.__serial2.flushInput() + self.__serial2.flushOutput() + # send the uuid through serial port self.__serial2.write(test_uuid) time.sleep(0.05) # there might be a small delay if self.__serial1.inWaiting() == 0: - self.fail("failed: PORT {} wait timeout exceded, wrong communication?".format(self.__port1)) + self.fail("failed: port {} wait timeout exceded".format(self.__port1)) else: - if (self.__serial1.readline() != test_uuid): - self.fail("failed: PORT {} write/read mismatch".format(self.__port1)) - + if self.__serial1.readline() != test_uuid: + self.fail("failed: port {} write/read mismatch".format(self.__port1)) + + def getresults(self): + return self.__resultlist + + def gettextresult(self): + return "" diff --git a/test-cli/test/tests/qeeprom.py b/test-cli/test/tests/qeeprom.py index f72f78f..7900381 100644 --- a/test-cli/test/tests/qeeprom.py +++ b/test-cli/test/tests/qeeprom.py @@ -1,38 +1,68 @@ -from test.helpers.syscmd import SysCommand import unittest -import uuid +import os class Qeeprom(unittest.TestCase): + params = None + __resultlist = None # resultlist is a python list of python dictionaries + __QEepromName = None + __i2cBus = None + __device = None - def __init__(self, testname, testfunc): + # varlist: position, eeprompath + def __init__(self, testname, testfunc, varlist): + self.params = varlist super(Qeeprom, self).__init__(testfunc) self._testMethodDoc = testname + self.__xmlObj = varlist["xml"] + self.__QEepromName = varlist.get('name', 'qeeprom') + self.__i2cAddress = varlist.get('i2c_address', self.__xmlObj.getKeyVal(self.__QEepromName, "i2c_address", "0050")) + self.__i2cBus = varlist.get('i2c_bus', self.__xmlObj.getKeyVal(self.__QEepromName, "i2c_bus", "0")) + self.__device = '/sys/bus/i2c/devices/{}-{}/eeprom'.format(self.__i2cBus, self.__i2cAddress) + + self.__resultlist = [] + + def __check_device(self): + if os.path.isfile(self.__device): + return True + return False + + def __eeprom_write(self, data): + try: + f = open(self.__device, "wb") + f.write(data) + f.close() + except IOError as Error: + return False, '{}'.format(Error.errno) + return True, '' + + def __eeprom_read(self, size): + try: + f = open(self.__device, "rb") + data = f.read(size) + f.close() + except IOError as Error: + return False, '{}'.format(Error.errno) + return True, data + def execute(self): - str_cmd = "find /sys/ -iname 'eeprom'" - eeprom_location = SysCommand("eeprom_location", str_cmd) - if eeprom_location.execute() == 0: - self.__raw_out = eeprom_location.getOutput() - if self.__raw_out == "": - self.fail("Unable to get EEPROM location. IS EEPROM CONNECTED?") - return -1 - eeprom=self.__raw_out.decode('ascii') - test_uuid = uuid.uuid4() - str_cmd="echo '{}' > {}".format(str(test_uuid), eeprom) - eeprom_write = SysCommand("eeprom_write", str_cmd) - if eeprom_write.execute() == 0: - self.__raw_out = eeprom_write.getOutput() - if self.__raw_out == "": - self.fail("Unable to write on the EEPROM?") - return -1 - str_cmd = "head -2 {}".format(eeprom) - eeprom_read = SysCommand("eeprom_read", str_cmd) - if eeprom_read.execute() == 0: - self.__raw_out = eeprom_read.getOutput() - if self.__raw_out == "": - self.fail("Unable to read from the EEPROM?") - return -1 - if(str(self.__raw_out).find(str(test_uuid)) == -1): - self.fail("failed: READ/WRITE mismatch") - else: - self.fail("failed: could not complete find eeprom command") + if not self.__check_device(): + self.fail("cannot open device {}".format(self.__device)) + + data = bytes('AbcDeFgHijK098521', 'utf-8') + + v, w = self.__eeprom_write(data) + if not v: + self.fail("eeprom: {} write test failed".format(w)) + v, r = self.__eeprom_read(len(data)) + if not v: + self.fail("eeprom: {} read test failed".format(r)) + if r != data: + self.fail("mismatch between write and read bytes") + + + def getresults(self): + return self.__resultlist + + def gettextresult(self): + return "" diff --git a/test-cli/test/tests/qethernet.py b/test-cli/test/tests/qethernet.py index 2ac447c..7d081a1 100644 --- a/test-cli/test/tests/qethernet.py +++ b/test-cli/test/tests/qethernet.py @@ -1,57 +1,124 @@ -from test.helpers.syscmd import SysCommand import unittest - +import sh +import json +import time +import uuid +import iperf3 +import netifaces +from test.helpers.utils import save_json_to_disk +from test.helpers.utils import station2Port class Qethernet(unittest.TestCase): - __sip = None - __raw_out = None - __MB_req = None - __MB_real = None - __BW_real = None - __dat_list = None - __bind = None - __OKBW = None - - def __init__(self, testname, testfunc, sip = None, OKBW=100, bind=None): + __serverip = None + __numbytestx = None + __bwexpected = None + __port = None + params = None + __bwreal = None + __resultlist = None # resultlist is a python list of python dictionaries + timebetweenattempts = 5 + + # varlist content: serverip, bwexpected, port + def __init__(self, testname, testfunc, varlist): + self.params = varlist super(Qethernet, self).__init__(testfunc) - if sip is not None: - self.__sip = sip - if sip is not None: - self.__bind = bind - self.__MB_req = '10' - self.__OKBW=OKBW self._testMethodDoc = testname + self.__xmlObj = varlist["xml"] + self.__QEthName = varlist.get('name', 'qeth') + self.__loops = varlist.get('loops', self.__xmlObj.getKeyVal(self.__QEthName, "loops", "1")) + self.__port = varlist.get('port', self.__xmlObj.getKeyVal(self.__QEthName, "port", "5000")) + self.__bw = varlist.get('bwexpected', self.__xmlObj.getKeyVal(self.__QEthName, "bwexpected", "40.0")) + self.__serverip = varlist.get('serverip', self.__xmlObj.getKeyVal(self.__QEthName, "serverip", "localhost")) + self.__duration = varlist.get('duration', self.__xmlObj.getKeyVal(self.__QEthName, "duration", "10")) + self.__interface = varlist.get('interface', self.__xmlObj.getKeyVal(self.__QEthName, "interface", "eth0")) + self.__toPath = varlist.get('to', self.__xmlObj.getKeyVal(self.__QEthName, "to", "/mnt/station_ramdisk")) + self.__retriesCount = varlist.get('retries', self.__xmlObj.getKeyVal(self.__QEthName, "retries", "5")) + self.__waitRetryTime = varlist.get('wait_retry', self.__xmlObj.getKeyVal(self.__QEthName, "wait_retry", "10")) + self.__wifi_res_file = varlist.get('eth_res_file', self.__xmlObj.getKeyVal(self.__QEthName, "eth_res_file", "eth_test_{}.json")) + self.__wifi_st_file = varlist.get('eth_st_file', self.__xmlObj.getKeyVal(self.__QEthName, "eth_st_file", "eth_st_{}.json")) + self.__file_uuid = uuid.uuid4() + self.__wifi_res_file = self.__wifi_res_file.format(self.__file_uuid) + self.__wifi_st_file = self.__wifi_st_file.format(self.__file_uuid) + + self.__resultlist = [] + + def checkInterface(self, reqInterface): + ifaces = netifaces.interfaces() + for t in ifaces: + if t == reqInterface: + return True + return False def execute(self): - print - if self.__bind is None: - str_cmd = "iperf -c {} -x CMSV -n {}M".format(self.__sip, self.__MB_req) - else: - str_cmd = "iperf -c {} -x CMSV -n {}M -B {}".format(self.__sip, self.__MB_req, self.__bind) - iperf_command = SysCommand("iperf", str_cmd) - if iperf_command.execute() == 0: - self.__raw_out = iperf_command.getOutput() - if self.__raw_out == "": - return -1 - lines = iperf_command.getOutput().splitlines() - dat = lines[1] - dat = dat.decode('ascii') - dat_list = dat.split( ) - for d in dat_list: - a = dat_list.pop(0) - if a == "sec": - break - self.__MB_real = dat_list[0] - self.__BW_real = dat_list[2] - self.__dat_list = dat_list - #print(self.__MB_real) - #print(self.__BW_real) - self.failUnless(float(self.__BW_real)>float(self.__OKBW)*0.9,"failed: speed is lower than spected. Speed(MB/s)" + str(self.__BW_real)) + self.__resultlist = [] + # check interfaces + if not self.checkInterface(self.__interface): + self.fail('Interface not found: {}'.format(self.__interface)) + + # Start Iperf + client = iperf3.Client() + client.duration = int(self.__duration) + client.server_hostname = self.__serverip + client.port = station2Port(self.__port) + count = int(self.__retriesCount) + while count > 0: + result = client.run() + if result.error == None: + if result.received_Mbps >= float(self.__bw) and result.sent_Mbps >= float(self.__bw): + save_json_to_disk(filePath='{}/{}'.format(self.__toPath, self.__wifi_res_file), + description='eth {} iperf3'.format(self.__interface), + mime='application/json', + json_data=result.json, + result=self.__resultlist) + return + else: + self.fail('bw fail: rx:{} tx:{}'.format(result.received_Mbps, result.sent_Mbps)) + else: + count = count - 1 + time.sleep(int(self.__waitRetryTime)) + + self.fail('qEth (max tries) Execution error: {}'.format(result.error)) + + def execute2(self): + # execute iperf command against the server, but it implements attempts in case the server is busy + iperfdone = False + while not iperfdone: + try: + p = sh.iperf3("-c", self.__serverip, "-n", self.__numbytestx, "-f", "m", "-p", self.__port, "-J", + _timeout=20) + iperfdone = True + except sh.TimeoutException: + self.fail("failed: iperf timeout reached") + except sh.ErrorReturnCode: + time.sleep(self.timebetweenattempts) + + # check if it was executed succesfully + if p.exit_code == 0: + if p.stdout == "": + self.fail("failed: error executing iperf command") + # analyze output string + data = json.loads(p.stdout.decode('ascii')) + self.__bwreal = float(data['end']['sum_received']['bits_per_second'])/1024/1024 + # save result file + with open('/mnt/station_ramdisk/ethernet-iperf3.json', 'w') as outfile: + json.dump(data, outfile, indent=4) + outfile.close() + self.__resultlist.append( + { + "description": "iperf3 output", + "filepath": "/mnt/station_ramdisk/ethernet-iperf3.json", + "mimetype": "application/json" + } + ) + + # check if BW is in the expected range + if self.__bwreal < float(self.__bwexpected): + self.fail("failed: speed is lower than spected. Speed(Mbits/s): " + str(self.__bwreal)) else: - self.fail("failed: could not complete iperf command") + self.fail("failed: could not complete iperf command.") - def get_Total_MB(self): - return self.__MB_real; + def getresults(self): + return self.__resultlist - def get_Total_BW(self): - return self.__MB_real; + def gettextresult(self): + return "" diff --git a/test-cli/test/tests/qflash.py b/test-cli/test/tests/qflash.py deleted file mode 100644 index cdc4109..0000000 --- a/test-cli/test/tests/qflash.py +++ /dev/null @@ -1,77 +0,0 @@ -from test.helpers.syscmd import SysCommand -import unittest -from test.helpers.globalVariables import globalVar - -class Qflasher(unittest.TestCase): - - def __init__(self, testname, testfunc): - super(Qflasher, self).__init__(testfunc) - self._testMethodDoc = testname - model=globalVar.g_mid - if model.find("IGEP0046") == 0: - self._flash_method = "mx6" - self._binlocation="/boot/" - elif model.find("IGEP0000") == 0: - self._flash_method = "mx6" - self._binlocation="/boot/" - elif model.find("IGEP0034") == 0: - self._flash_method = "nandti" - self._binlocation = "/boot/" - elif model.find("SOPA") == 0: - self._flash_method = "sopaflash" - elif model.find("OMAP3") == 0: - self._flash_method = "nandti" - self._binlocation = "/boot/" - elif model.find("OMAP5") == 0: - self._flash_method = "nandti" - self._binlocation = "/boot/" - - - def execute(self): - # CASE eMMC boards. - if(self._flash_method == "mx6"): - str_cmd= "dd if={}u-boot.imx of=/dev/mmcblk2 bs=512 seek=2 2>&1 >/dev/null".format(self._binlocation) - flash_command = SysCommand("flash_command", str_cmd) - if flash_command.execute() == 0: - str_cmd = "sync" - sync_command = SysCommand("sync_command", str_cmd) - if sync_command.execute() != 0: - self.fail("failed: could not sync") - else: - self.fail("failed: could not complete flash eMMC commands") - # CASE of nandflash boards - elif(self._flash_method == "nandti"): - str_cmd = "nandwrite -p -s 0x0 /dev/mtd0 {}u-boot.img 2>&1 >/dev/null; " \ - "nandwrite -p -s 0x20000 /dev/mtd0 {}u-boot.img 2>&1 >/dev/null; " \ - "nandwrite -p -s 0x40000 /dev/mtd0 {}u-boot.img 2>&1 >/dev/null; " \ - "nandwrite -p -s 0x60000 /dev/mtd0 {}u-boot.img 2>&1 >/dev/null; " \ - "".format(self._binlocation, self._binlocation, self._binlocation, self._binlocation,) - flash_MLO_command = SysCommand("flash_MLO_command", str_cmd) - if flash_MLO_command.execute() == 0: - str_cmd= "nandwrite -p /dev/mtd1 {}u-boot.img 2>&1 >/dev/null".format(self._binlocation) - flash_uBoot_command = SysCommand("flash_command", str_cmd) - if flash_uBoot_command.execute() == 0: - str_cmd = "sync" - sync_command = SysCommand("sync_command", str_cmd) - if sync_command.execute() != 0: - self.fail("failed: could not sync") - else: - self.fail("failed: could not complete flash u-boot commnds") - else: - self.fail("failed: could not complete flash MLO commnd") - - - # CASE of SOPA0000 BOARD. FULL FLASH (u-boot+kernel+rootfs) - elif (self._flash_method == "sopaflash"): - str_cmd = "nandtest /dev/mtd0 >/dev/null" - flash_command = SysCommand("flash_command", str_cmd) - if flash_command.execute() == 0: - str_cmd = "/usr/bin/igep-flash --skip-nandtest --image /opt/firmware/demo-ti-image-*-*.tar* >/dev/null 2>&1" - sync_command = SysCommand("sync_command", str_cmd) - if sync_command.execute() != 0: - self.fail("failed: could not complete flashing procces") - else: - self.fail("failed: could not complete nandtest mtd0") - - else: - self.fail("failed: could not find flash method") diff --git a/test-cli/test/tests/qi2c.py b/test-cli/test/tests/qi2c.py deleted file mode 100644 index 409005c..0000000 --- a/test-cli/test/tests/qi2c.py +++ /dev/null @@ -1,29 +0,0 @@ -from test.helpers.syscmd import SysCommand -import unittest - -class Qi2c(unittest.TestCase): - - def __init__(self, testname, testfunc, busnum, register): - super(Qi2c, self).__init__(testfunc) - self.__busnum = busnum - self.__register = register.split("/") - self.__devices=[] - self._testMethodDoc = testname - - def execute(self): - str_cmd= "i2cdetect -a -y -r {}".format(self.__busnum) - i2c_command = SysCommand("i2cdetect", str_cmd) - if i2c_command.execute() == 0: - self.__raw_out = i2c_command.getOutput() - if self.__raw_out == "": - return -1 - lines=self.__raw_out.decode('ascii').splitlines() - for i in range(len(lines)): - if (lines[i].count('UU')): - if (lines[i].find("UU")): - self.__devices.append("0x{}{}".format((i - 1), hex(int((lines[i].find("UU") - 4) / 3)).split('x')[-1])) - for i in range(len(self.__register)): - if not(self.__register[i] in self.__devices): - self.fail("failed: device {} not found in bus i2c-{}".format(self.__register[i], self.__busnum)) - else: - self.fail("failed: could not complete i2cdedtect command") diff --git a/test-cli/test/tests/qiperf.py b/test-cli/test/tests/qiperf.py deleted file mode 100644 index 126b6ee..0000000 --- a/test-cli/test/tests/qiperf.py +++ /dev/null @@ -1,53 +0,0 @@ -from test.helpers.syscmd import SysCommand - - -class QIperf(object): - __sip = None - __raw_out = None - __MB_req = None - __MB_real = None - __BW_real = None - __dat_list = None - __bind = None - - def __init__(self, sip = None): - if sip is not None: - self.__sip = sip - self.__MB_req = '10' - - def execute(self, sip = None, bind = None): - if sip is not None: - self.__sip = sip - - if bind is None: - str_cmd = "iperf -c {} -x CMSV -n {}M".format(self.__sip, self.__MB_req) - else: - self.__bind = bind - str_cmd = "iperf -c {} -x CMSV -n {}M -B {}".format(self.__sip, self.__MB_req, self.__bind) - t = SysCommand("iperf", str_cmd) - if t.execute() == 0: - self.__raw_out = t.getOutput() - if self.__raw_out == "": - return -1 - lines = t.getOutput().splitlines() - dat = lines[1] - dat = dat.decode('ascii') - dat_list = dat.split( ) - for d in dat_list: - a = dat_list.pop(0) - if a == "sec": - break - self.__MB_real = dat_list[0] - self.__BW_real = dat_list[2] - self.__dat_list = dat_list - print(self.__MB_real) - print(self.__BW_real) - else: - return -1 - return 0 - - def get_Total_MB(self): - return self.__MB_real; - - def get_Total_BW(self): - return self.__MB_real; diff --git a/test-cli/test/tests/qmmcflash.py b/test-cli/test/tests/qmmcflash.py new file mode 100644 index 0000000..0f5a0c1 --- /dev/null +++ b/test-cli/test/tests/qmmcflash.py @@ -0,0 +1,55 @@ +import unittest +import scanf +from scanf import scanf +from sh import mmc +from sh import ErrorReturnCode +from test.helpers.utils import save_file_to_disk +from test.helpers.utils import sys_read + +class Qmmcflash(unittest.TestCase): + params = None + + def __init__(self, testname, testfunc, varlist): + self.params = varlist + super(Qmmcflash, self).__init__(testfunc) + self._testMethodDoc = testname + self.__xmlObj = varlist["xml"] + + self.__QeMMCName = varlist.get('name', 'qemmc') + self.__emmcDevice = varlist.get('device', self.__xmlObj.getKeyVal(self.__QeMMCName, "device", "mmcblk0")) + self.__toPath = varlist.get('to', self.__xmlObj.getKeyVal(self.__QeMMCName, "to", "/mnt/station_ramdisk")) + self.__mmc_res_file = varlist.get('emmc_res_file', self.__xmlObj.getKeyVal(self.__QeMMCName, "emmc_res_file", "emmc_status.txt")) + self.__mmcPort = varlist.get('mmc_port', self.__xmlObj.getKeyVal(self.__QeMMCName, "mmc_port", "0")) + self.__mmcID = varlist.get('mmc_id', self.__xmlObj.getKeyVal(self.__QeMMCName, "mmc_id", "0001")) + + self.__resultlist = [] + + def execute(self): + try: + dataOut = mmc('extcsd', 'read', '/dev/{}'.format(self.__emmcDevice)) + save_file_to_disk(filePath='{}/{}'.format(self.__toPath, self.__mmc_res_file), + description='eMMC health test', + mime='text/plain', + data=dataOut.stdout.decode('utf-8'), + result=self.__resultlist) + sysDevice = "/sys/bus/mmc/drivers/mmcblk/mmc{}:{}".format(self.__mmcPort, self.__mmcID) + r, data = sys_read("{}/life_time".format(sysDevice)) + if not r: + self.fail("emmc: life_time not found") + res = scanf("0x%d 0x%d", data) + if res[0] > 3 or res[1] > 3: + self.fail("emmc: review {} life_time > 3".format(sysDevice)) + r, data = sys_read("{}/pre_eol_info".format(sysDevice)) + if not r: + self.fail("emmc: pre_eol_info not found") + res = scanf("0x%d", data) + if res[0] != 1: + self.fail("emmc: review {} pre_eol_info != 1".format(sysDevice)) + except ErrorReturnCode as Error: + self.fail("emmc: failed {} ".format(Error.exit_code)) + + def getresults(self): + return self.__resultlist + + def gettextresult(self): + return ""
\ No newline at end of file diff --git a/test-cli/test/tests/qnand.py b/test-cli/test/tests/qnand.py new file mode 100644 index 0000000..6de1eaf --- /dev/null +++ b/test-cli/test/tests/qnand.py @@ -0,0 +1,48 @@ +import unittest +import sh +import uuid +from test.helpers.utils import save_file_to_disk + +class Qnand(unittest.TestCase): + params = None + __device = "10M" + __resultlist = None # resultlist is a python list of python dictionaries + + # varlist: device + def __init__(self, testname, testfunc, varlist): + self.params = varlist + super(Qnand, self).__init__(testfunc) + self._testMethodDoc = testname + self.__xmlObj = varlist["xml"] + self.__QNandName = varlist.get('name', 'qnand') + self.__device = varlist.get('device', self.__xmlObj.getKeyVal(self.__QNandName, "device", "/dev/mtd0")) + self.__nand_res_file = varlist.get('nand_res_file', self.__xmlObj.getKeyVal(self.__QNandName, "nand_res_file", "nand_test_{}.txt")) + self.__toPath = varlist.get('to', self.__xmlObj.getKeyVal(self.__QNandName, "to", "/mnt/station_ramdisk")) + self.__file_uuid = uuid.uuid4() + self.__nand_res_file = self.__nand_res_file.format(self.__file_uuid) + nandtestPath = self.__xmlObj.getKeyVal("qnand", "nandtestPath", '/root/hwtest-files/nandtest"') + + if nandtestPath == '': + self.myNandTest = sh.nandtest + else: + self.myNandTest = sh.Command(nandtestPath) + self.__resultlist = [] + + def execute(self): + try: + res = self.myNandTest("-m", self.__device) + save_file_to_disk(filePath='{}/{}'.format(self.__toPath, self.__nand_res_file), + description='nand {} test'.format(self.__device), + mime='text/plain', + data=res.stdout.decode('utf-8'), + result=self.__resultlist) + + except sh.ErrorReturnCode_1 as e: + self.fail("nandtest failed: {}".format(e.ErrorReturnCode.stdout)) + + + def getresults(self): + return self.__resultlist + + def gettextresult(self): + return "" diff --git a/test-cli/test/tests/qplc.py b/test-cli/test/tests/qplc.py new file mode 100644 index 0000000..28310c6 --- /dev/null +++ b/test-cli/test/tests/qplc.py @@ -0,0 +1,121 @@ +import shutil +import unittest +import os.path +import os +import sh +import stat +import time +import ping3 +from netifaces import AF_INET +import netifaces as ni + +from test.helpers.md5 import md5_file +from test.helpers.plc import dcpPLC +from test.helpers.iseelogger import logObj + +class Qplc(unittest.TestCase): + params = None + __resultlist = None # resultlist is a python list of python dictionaries + __QPLCName = None + __plc = None + __board_uuid = None + + def __init__(self, testname, testfunc, varlist): + self.params = varlist + super(Qplc, self).__init__(testfunc) + self._testMethodDoc = testname + self.__xmlObj = varlist["xml"] + self.__psdbObj = varlist["db"] + self.__board_uuid = varlist["boarduuid"] + + self.__QPLCName = varlist.get('name', 'qplc') + self.__firmware = varlist.get('firmware', self.__xmlObj.getKeyVal(self.__QPLCName, "firmware", "")) + self.__firmware_md5 = varlist.get('firmware_md5', self.__xmlObj.getKeyVal(self.__QPLCName, "firmware_md5", "")) + self.__factoryTool = varlist.get('factory_tool', self.__xmlObj.getKeyVal(self.__QPLCName, "factory_tool", "")) + self.__gen_ip = varlist.get('gen_ip', self.__xmlObj.getKeyVal(self.__QPLCName, "gen_ip", "0")) + self.__gen_mac = varlist.get('gen_mac', self.__xmlObj.getKeyVal(self.__QPLCName, "gen_mac", "0")) + self.__mtd_device = varlist.get('mtd_device', self.__xmlObj.getKeyVal(self.__QPLCName, "mtd_device", "/dev/mtd0")) + self.__firmware_Path = varlist.get('firmwarepath', self.__xmlObj.getKeyVal(self.__QPLCName, "firmwarepath", "/root/hwtest-files/firmware")) + self.__skipflash = varlist.get('skipflash', self.__xmlObj.getKeyVal(self.__QPLCName, "skipflash", "0")) + self.__plc = dcpPLC(self.__factoryTool, self.__mtd_device) + self.__factory_password = varlist.get('factory_password', self.__xmlObj.getKeyVal(self.__QPLCName, "factory_password", "0")) + self.__firmware_password = varlist.get('firmware_password', self.__xmlObj.getKeyVal(self.__QPLCName, "firmware_password", "0")) + self.__plc_reset_wait = varlist.get('plc_reset_wait', self.__xmlObj.getKeyVal(self.__QPLCName, "plc_reset_wait", "60")) + self.__resultlist = [] + + + def checkfiles(self): + if not os.path.isfile(self.__factoryTool): + return False, 'Factory tool Not found {}/{}'.format(self.__factoryTool) + if not os.path.isfile('{}/{}'.format(self.__firmware_Path, self.__firmware)): + return False, 'Firmware Not found {}/{}'.format(self.__firmware_Path, self.__firmware) + if not os.path.isfile('{}/{}'.format(self.__firmware_Path, self.__firmware_md5)): + return False, 'Firmware md5 Not found {}/{}'.format(self.__firmware_Path, self.__firmware_md5) + try: + mode = os.stat(self.__mtd_device).st_mode; + if not stat.S_ISCHR(mode): + return False, 'No device {} found'.format(self.__mtd_device) + except: + return False, 'No device {} found'.format(self.__mtd_device) + return True, '' + + def __flashMemory(self): + r, v = self.__plc.SaveFirmware('{}/{}'.format(self.__firmware_Path, self.__firmware)) + if not r: + self.fail(v) + + def execute(self): + # Check files + v, m = self.checkfiles() + if not v: + self.fail(m) + # do flash & verify + if self.__skipflash == '0': + self.__flashMemory() + # do Plc boot + self.__plc.setBootMode() + # Wait plc goes UP + if self.__gen_mac: + res, v = self.__psdbObj.get_plc_macaddr(self.__board_uuid) + if res == None and v == 'MAC limit by uuid': + self.fail('MAC limit by uuid') + elif res == None: + self.fail('BUG: ?????') + logObj.getlogger().info("MAC {}".format(res[0])) + plcMAC = res[0][0] + # wait for PLC to be turned on + attemptcounter = 0 + plcfound = False + while attemptcounter < 5 and not plcfound: + if self.__plc.discover(): + plcfound = True + else: + attemptcounter += 1 + time.sleep(5) + # mandatory wait before configuring the PLC chip + time.sleep(2) + self.__plc.set_plc2('SYSTEM.PRODUCTION.SECTOR0_UNLOCK_PASSWORD', + self.__factory_password, + 'SYSTEM.PRODUCTION.MAC_ADDR', + '{}'.format(plcMAC), + self.__firmware_password) + # plc reset to save changes + self.__plc.setPLCReset() + time.sleep(int(self.__plc_reset_wait)) + # calculate the IP of the GPLC0000 board to ping + ip_eth01 = ni.ifaddresses('eth0:1')[AF_INET][0]['addr'] # plc_test_ip = 10.10.1.113 + ip_eth01_splited = ip_eth01.split('.') + ip_eth01_splited[3] = str(int(ip_eth01_splited[3]) + 100) + plc_test_ip = ".".join(ip_eth01_splited) # plc_test_ip = 10.10.1.213 + # ping against the GPLC0000 board + ping3.EXCEPTIONS = True + try: + ping3.ping(plc_test_ip, timeout=0.1) + except: + self.fail('PLC ping timeout') + + def getresults(self): + return self.__resultlist + + def gettextresult(self): + return "" diff --git a/test-cli/test/tests/qram.py b/test-cli/test/tests/qram.py index 8ec0210..49d4d54 100644 --- a/test-cli/test/tests/qram.py +++ b/test-cli/test/tests/qram.py @@ -1,22 +1,62 @@ -from test.helpers.syscmd import SysCommand import unittest +import sh +# from test.helpers.iseelogger import MeasureTime +# from test.helpers.iseelogger import logObj +from test.helpers.utils import save_file_to_disk class Qram(unittest.TestCase): + params = None + __memsize = None + __loops = None + __resultlist = [] # resultlist is a python list of python dictionaries - def __init__(self, testname, testfunc, memSize): + # varlist: memsize, loops + def __init__(self, testname, testfunc, varlist): + self.params = varlist super(Qram, self).__init__(testfunc) - self.__memSize = memSize self._testMethodDoc = testname + self.__xmlObj = varlist["xml"] - def execute(self): - str_cmd= "free -m" - free_command = SysCommand("free_ram", str_cmd) - if free_command.execute() == 0: - self.__raw_out = free_command.getOutput() - if self.__raw_out == "": - return -1 - lines = free_command.getOutput().splitlines() - aux = [int(s) for s in lines[1].split() if s.isdigit()] - self.failUnless(int(aux[0])>int(self.__memSize),"failed: total ram memory size lower than expected") + self.__QramName = varlist.get('name', 'qram') + self.__loops = varlist.get('loops', self.__xmlObj.getKeyVal(self.__QramName, "loops", "1")) + self.__memsize = varlist.get('memsize', self.__xmlObj.getKeyVal(self.__QramName, "memsize", "50M")) + self.__toPath = varlist.get('to', self.__xmlObj.getKeyVal(self.__QramName, "to", "/mnt/station_ramdisk")) + self.__ram_res_file = varlist.get('ram_res_file', self.__xmlObj.getKeyVal(self.__QramName, "ram_res_file", "ram_res.txt")) + + self.__dummytest = varlist.get('dummytest', self.__xmlObj.getKeyVal(self.__QramName, "dummytest", "0")) + self.__dummyresult = varlist.get('dummytestresult', self.__xmlObj.getKeyVal(self.__QramName, "dummytest", "0")) + + memtesterPath = self.__xmlObj.getKeyVal(self.__QramName, "memtesterPath", '') + + if memtesterPath == '': + self.myMemtester = sh.memtester else: - self.fail("failed: could not complete iperf command") + self.myMemtester = sh.Command(memtesterPath) + + def execute(self): + self.__resultlist = [] + try: + # mytime = MeasureTime() + res = self.myMemtester(self.__memsize, '{}'.format(self.__loops)) + save_file_to_disk(filePath='{}/{}'.format(self.__toPath, self.__ram_res_file), + description='Ram result test', + mime='text/plain', + data=res.stdout.decode('utf-8'), + result=self.__resultlist) + # mytime.stop() + except sh.ErrorReturnCode as e: + save_file_to_disk(filePath='{}/{}'.format(self.__toPath, self.__ram_res_file), + description='Ram result test', + mime='text/plain', + data=e.stdout.decode('utf-8'), + result=self.__resultlist) + self.fail("failed: memtester {}::{}".format(str(e.exit_code), e.stdout.decode('utf-8'))) + + except Exception as details: + self.fail('Error: {}'.format(details)) + + def getresults(self): + return self.__resultlist + + def gettextresult(self): + return "" diff --git a/test-cli/test/tests/qrtc.py b/test-cli/test/tests/qrtc.py index 1d02f78..df493d2 100644 --- a/test-cli/test/tests/qrtc.py +++ b/test-cli/test/tests/qrtc.py @@ -1,28 +1,46 @@ -from test.helpers.syscmd import SysCommand +import sh import unittest import time +import re + class Qrtc(unittest.TestCase): + params = None + __rtc = None + __resultlist = None # resultlist is a python list of python dictionaries - def __init__(self, testname, testfunc, rtc): + def __init__(self, testname, testfunc, varlist): + self.params = varlist super(Qrtc, self).__init__(testfunc) - self.__rtc = rtc + if "rtc" in varlist: + self.__rtc = varlist["rtc"] + else: + raise Exception('rtc param inside Qrtc must be defined') self._testMethodDoc = testname + self.__resultlist = [] def execute(self): - str_cmd = "hwclock -f {}".format(self.__rtc) - rtc_set = SysCommand("rtc_set", str_cmd) - if rtc_set.execute() == 0: - curr_hour = rtc_set.getOutput().decode('ascii').split(" ") - time1 = int((curr_hour[4].split(":"))[2]) + # get time from RTC peripheral + p = sh.hwclock("-f", self.__rtc) + if p.exit_code == 0: + time1 = re.search("([01]?[0-9]{1}|2[0-3]{1})[:][0-5]{1}[0-9]{1}[:][0-5]{1}[0-9]{1}", + p.stdout.decode('ascii')) time.sleep(1) - if rtc_set.execute() == 0: - curr_hour = rtc_set.getOutput().decode('ascii').split(" ") - time2 = int((curr_hour[4].split(":"))[2]) - if time1==time2: + # get time from RTC another time + p = sh.hwclock("-f", self.__rtc) + if p.exit_code == 0: + time2 = re.search("([01]?[0-9]{1}|2[0-3]{1})[:][0-5]{1}[0-9]{1}[:][0-5]{1}[0-9]{1}", + p.stdout.decode('ascii')) + # check if the seconds of both times are different + if time1 == time2: self.fail("failed: RTC is not running") else: - self.fail("failed: couldn't execute hwclock command 2nd time") + self.fail("failed: couldn't execute hwclock command for the second time") else: self.fail("failed: couldn't execute hwclock command") + def getresults(self): + return self.__resultlist + + def gettextresult(self): + return "" diff --git a/test-cli/test/tests/qscreen.py b/test-cli/test/tests/qscreen.py deleted file mode 100644 index b8a5a48..0000000 --- a/test-cli/test/tests/qscreen.py +++ /dev/null @@ -1,31 +0,0 @@ -from test.helpers.syscmd import SysCommand -import unittest -import time -from test.helpers.cv_display_test import pattern_detect - -class Qscreen(unittest.TestCase): - - def __init__(self, testname, testfunc, display): - super(Qscreen, self).__init__(testfunc) - self.__display = display - self._testMethodDoc = testname - - def execute(self): - str_cmd = "fbi -T 1 --noverbose -d /dev/{} test/files/test_pattern.png".format(self.__display) - display_image = SysCommand("display_image", str_cmd) - if display_image.execute() == -1: - test_screen = pattern_detect(1) - if not test_screen=="0": - self.fail("failed: {}".format(test_screen)) - str_cmd= "fbi -T 1 /home/root/result_hdmi_img.jpg -d /dev/fb0 --noverbose -a" - show_img = SysCommand("show-image", str_cmd) - show_img.execute() - else: - self.fail("failed: could not display the image") - try: - str_cmd= "fbi -T 1 /home/root/result_hdmi_img.jpg -d /dev/fb0 --noverbose -a" - show_img = SysCommand("show-image", str_cmd) - show_img.execute() - except ValueError: - print("COULD NOT DISPLAY IMAGE") - diff --git a/test-cli/test/tests/qserial.py b/test-cli/test/tests/qserial.py index 43ba3c8..402e33f 100644 --- a/test-cli/test/tests/qserial.py +++ b/test-cli/test/tests/qserial.py @@ -1,30 +1,55 @@ -from test.helpers.syscmd import SysCommand import unittest import uuid import serial import time + class Qserial(unittest.TestCase): + params = None + __port = None + __baudrate = None + __resultlist = None # resultlist is a python list of python dictionaries - def __init__(self, testname, testfunc, port, baudrate): + # varlist: port, baudrate + def __init__(self, testname, testfunc, varlist): + self.params = varlist super(Qserial, self).__init__(testfunc) - self.__port = port - self.__serial = serial.Serial(self.__port, timeout=1) - self.__serial.baudrate = baudrate + if "port" in varlist: + self.__port = varlist["port"] + else: + raise Exception('port param inside Qserial must be defined') + + if "baudrate" in varlist: + self.__baudrate = varlist["baudrate"] + else: + raise Exception('baudrate param inside Qserial must be defined') self._testMethodDoc = testname + self.__resultlist = [] + + # open serial connection + self.__serial = serial.Serial(self.__port, self.__baudrate, timeout=1) def __del__(self): self.__serial.close() def execute(self): + # generate a random uuid + test_uuid = str(uuid.uuid4()).encode() + # clean serial port self.__serial.flushInput() self.__serial.flushOutput() - test_uuid = str(uuid.uuid4()).encode() + # send the uuid through serial port self.__serial.write(test_uuid) time.sleep(0.05) # there might be a small delay if self.__serial.inWaiting() == 0: - self.fail("failed: PORT {} wait timeout exceded, wrong communication?".format(self.__port)) + self.fail("failed: port {} wait timeout exceded".format(self.__port)) else: - if (self.__serial.readline() != test_uuid): - self.fail("failed: PORT {} write/read mismatch".format(self.__port)) + # check if what it was sent is equal to what has been received + if self.__serial.readline() != test_uuid: + self.fail("failed: port {} write/read mismatch".format(self.__port)) + + def getresults(self): + return self.__resultlist + def gettextresult(self): + return "" diff --git a/test-cli/test/tests/qtemplate.py b/test-cli/test/tests/qtemplate.py deleted file mode 100644 index 940cded..0000000 --- a/test-cli/test/tests/qtemplate.py +++ /dev/null @@ -1,51 +0,0 @@ -#IF COMMAND IS NEEDED -from test.helpers.syscmd import SysCommand -import unittest -#class name -class Qtemplate(unittest.TestCase): - # Initialize the variables - __variable1 = "Value-a" - __variable2 = "Value-b" - #.... - __variablen = "Value-n" - - def __init__(self, testname, testfunc, input1=None, inputn=None): - # Doing this we will initialize the class and later on perform a particular method inside this class - super(Qtemplate, self).__init__(testfunc) - self.__testname = testname - self.__input1 = input1 - self.__inputn = inputn - self._testMethodDoc = testname - - - - def execute(self): - str_cmd = "command" - t = SysCommand("command-name", str_cmd) - if t.execute() == 0: - self.__raw_out = t.getOutput() - if self.__raw_out == "": - return -1 - lines = t.getOutput().splitlines() - dat = lines[1] - dat = dat.decode('ascii') - dat_list = dat.split( ) - for d in dat_list: - a = dat_list.pop(0) - if a == "sec": - break - self.__MB_real = dat_list[0] - self.__BW_real = dat_list[2] - self.__dat_list = dat_list - print(self.__MB_real) - print(self.__BW_real) - self.failUnless(int(self.__BW_real)>int(self.__OKBW)*0.9,"FAIL:BECAUSE...") - else: - return -1 - return 0 - - def get_Total_MB(self): - return self.__MB_real; - - def get_Total_BW(self): - return self.__MB_real; diff --git a/test-cli/test/tests/qusb.py b/test-cli/test/tests/qusb.py index 44490bc..4f76d0c 100644 --- a/test-cli/test/tests/qusb.py +++ b/test-cli/test/tests/qusb.py @@ -1,60 +1,95 @@ -from test.helpers.syscmd import SysCommand +import shutil import unittest +import os.path +import os +import sh + +from test.helpers.md5 import md5_file class Qusb(unittest.TestCase): + params = None + __resultlist = None # resultlist is a python list of python dictionaries + __QusbName = None - def __init__(self, testname, testfunc, devLabel, numPorts): + def __init__(self, testname, testfunc, varlist): + self.params = varlist super(Qusb, self).__init__(testfunc) - self.__numPorts = numPorts self._testMethodDoc = testname - self.__devLabel = devLabel - if testname=="USBOTG": - self.__usbFileName = "/this_is_an_usb_otg" - self.__usbtext = "USBOTG" - elif testname=="SATA": - self.__usbFileName = "/this_is_a_sata" - self.__usbtext = "SATA" - else: - self.__usbFileName = "/this_is_an_usb_host" - self.__usbtext = "USBHOST" - self.__numUsbFail=[] + self.__xmlObj = varlist["xml"] + + self.__QusbName = varlist.get('name', 'qusb') + self.__loops = varlist.get('loops', self.__xmlObj.getKeyVal(self.__QusbName, "loops", "3")) + self.__filepath_from = varlist.get('path', self.__xmlObj.getKeyVal(self.__QusbName, "path", "/root/hwtest-files")) + self.__file_source = varlist.get('file', self.__xmlObj.getKeyVal(self.__QusbName, "file", "usb-test.bin")) + self.__file_md5 = varlist.get('file_md5', self.__xmlObj.getKeyVal(self.__QusbName, "file_md5", "usb-test.bin.md5")) + self.__path_to = varlist.get('pathto', self.__xmlObj.getKeyVal(self.__QusbName, "pathto", "/mnt/test/disk2")) + self.__check_mounted = varlist.get('check_mounted', self.__xmlObj.getKeyVal(self.__QusbName, "check_mounted", "")) + + self.__resultlist = [] + + def removefileIfExist(self, fname): + if os.path.exists(fname): + try: + os.remove(fname) + except OSError as error: + return False, str(error) + return True, '' + + def checkfiles(self): + if not os.path.isfile('{}/{}'.format(self.__filepath_from, self.__file_source)): + return False, 'File test binary Not found {}/{}'.format(self.__filepath_from, self.__file_source) + if not os.path.isfile('{}/{}'.format(self.__filepath_from, self.__file_md5)): + return False, 'File test binary md5 Not found {}/{}'.format(self.__filepath_from, self.__file_md5) + if self.__check_mounted != '': + if not os.path.ismount('{}'.format(self.__check_mounted)): + return False, 'Required mounted failed: {}'.format(self.__path_to) + r, s = self.removefileIfExist('{}/{}'.format(self.__path_to, self.__file_source)) + if not r: + return r, s + return True, '' + + def getTestFile_md5(self, fname): + t = '' + with open('{}'.format(fname), 'r') as outfile: + t = outfile.read().rstrip("\n") + outfile.close() + return t + + def ExecuteTranfer(self): + try: + originalMD5 = self.getTestFile_md5('{}/{}'.format(self.__filepath_from, self.__file_md5)) + if originalMD5 == '': + self.fail('MD5 file {}/{} Not contain any md5'.format(self.__filepath_from, self.__file_md5)) + # shutil.copy2('{}/{}'.format(self.__filepath_from, self.__file_source), '{}'.format(self.__path_to)) + sh.cp('{}/{}'.format(self.__filepath_from, self.__file_source), '{}'.format(self.__path_to)) + sh.sync() + md5val = md5_file('{}/{}'.format(self.__path_to, self.__file_source)) + self.removefileIfExist('{}/{}'.format(self.__path_to, self.__file_source)) + if originalMD5 != md5val: + return False, 'md5 check failed {}<>{}'.format(originalMD5, md5val) + except OSError as why: + return False,'Error: {} tranfer failed'.format(why.errno) + except sh.ErrorReturnCode as shError: + return False, 'USB Exception: tranfer failed' + except Exception as details: + return False, 'USB Exception: {} tranfer failed'.format(details) + return True, '' def execute(self): - str_cmd= "lsblk -o LABEL" - lsblk_command = SysCommand("lsblk", str_cmd) - if lsblk_command.execute() == 0: - self.__raw_out = lsblk_command.getOutput() - if self.__raw_out == "": - return -1 - lines = lsblk_command.getOutput().splitlines() - host_list=[] - for i in range(len(lines)): - if str(lines[i].decode('ascii'))==self.__devLabel: - host_list.append(i) - if len(host_list)==int(self.__numPorts): - str_cmd = "lsblk -o MOUNTPOINT" - lsblk_command = SysCommand("lsblk", str_cmd) - if lsblk_command.execute() == 0: - self.__raw_out = lsblk_command.getOutput() - if self.__raw_out == "": - print("failed: no command output") - self.fail("failed: no command output") - else: - lines = lsblk_command.getOutput().splitlines() - for i in range(len(host_list)): - file_path=str(lines[host_list[i]].decode('ascii')) + self.__usbFileName - usb_file = open(file_path, 'r') - read=usb_file.read() - if read.find(self.__usbtext)!=-1: - print(file_path + " --> OK!") - else: - self.fail("failed: could not read from usb {}".format(lines[host_list[i]].decode('ascii'))) - self.__numUsbFail.append(host_list[i]) - usb_file.close() - else: - self.fail("failed: couldn't execute lsblk command") - - else: - self.fail("failed: reference and real usb host devices number mismatch") - else: - self.fail("failed: couldn't execute lsblk command") + v, m = self.checkfiles() + if not v: + self.fail(m) + countLoops = int(self.__loops) + while countLoops > 0: + res, msg = self.ExecuteTranfer() + if not res: + res, msg = self.ExecuteTranfer() + if not res: + self.fail(msg) + countLoops = countLoops - 1 + + def getresults(self): + return self.__resultlist + + def gettextresult(self): + return "" diff --git a/test-cli/test/tests/qvideo.py b/test-cli/test/tests/qvideo.py new file mode 100644 index 0000000..225940b --- /dev/null +++ b/test-cli/test/tests/qvideo.py @@ -0,0 +1,125 @@ +import unittest +import sh +import time +from test.helpers.camara import Camara +from test.helpers.detect import Detect_Color +from test.helpers.sdl import SDL2_Test + +class Qvideo(unittest.TestCase): + params = None + __resultlist = None # resultlist is a python list of python dictionaries + + def __init__(self, testname, testfunc, varlist): + self.params = varlist + super(Qvideo, self).__init__(testfunc) + self._testMethodDoc = testname + self.__xmlObj = varlist["xml"] + self.__QVideoName = varlist.get('name', 'qvideo') + self.__resultlist = [] + self.__w = int(varlist.get('capture_size_w', self.__xmlObj.getKeyVal(self.__QVideoName, "capture_size_w", 1280))) + self.__h = int(varlist.get('capture_size_h', self.__xmlObj.getKeyVal(self.__QVideoName, "capture_size_h", 720))) + self.__discard_frames_Count = int(varlist.get('capture_discardframes',self.__xmlObj.getKeyVal(self.__QVideoName, "capture_discardframes", 3))) + self.__frame_mean = int(varlist.get('capture_framemean', self.__xmlObj.getKeyVal(self.__QVideoName, "capture_framemean", 3))) + self.__max_failed = int(varlist.get('capture_maxfailed', self.__xmlObj.getKeyVal(self.__QVideoName, "capture_maxfailed", 1))) + self.__cam_setupscript = varlist.get('cam_setupfile', self.__xmlObj.getKeyVal(self.__QVideoName, "cam_setupfile", + "/root/hwtest-files/board/scripts/v4l-cam.sh")) + self.__sdl_display = varlist.get('sdl_display', self.__xmlObj.getKeyVal(self.__QVideoName, "sdl_display", ":0")) + self.__sdl_driver = varlist.get('sdl_driver', self.__xmlObj.getKeyVal(self.__QVideoName, "sdl_driver", "x11")) + self.__camdevice = varlist.get('camdevice', self.__xmlObj.getKeyVal(self.__QVideoName, "camdevice", "video0")) + + self.__Camara = Camara(setup_script_path=self.__cam_setupscript, device=self.__camdevice, width=self.__w, height=self.__h) + self.__SDL2_Test = SDL2_Test(driver=self.__sdl_driver, display=self.__sdl_display, w=self.__w, h=self.__h) + self.__SDL2_Test.Clear() + + def __drop_frames(self, frame_count): + count = frame_count + while count > 0: + _ = self.__Camara.getFrame() + count -= 1 + + def Verify_Color(self, color): + self.paintColor(color) + count = self.__frame_mean + res_ok = 0 + res_failed = 0 + r_count = 0 + c_mean = 0 + while count > 0: + res, t = self.Capture(color) + if res: + res_ok += 1 + else: + res_failed += 1 + if t == "count": + r_count += 1 + else: + c_mean +=1 + count -= 1 + self.unpaintColor(color) + if res_failed > self.__max_failed: + if r_count > 0: + return '{}_COLOR_FAILED'.format(color) + elif c_mean > 0: + return 'MEAN_{}_FAILED'.format(color) + return "ok" + + def paintColor(self, color): + self.__SDL2_Test.Paint(color) + time.sleep(3) + self.__drop_frames(self.__Camara.getFrameCount()) + + def unpaintColor(self, color): + self.__SDL2_Test.Clear() + time.sleep(1) + + def Capture(self, color): + image = self.__Camara.getFrame() + if image is None: + return None + dtObject = Detect_Color(image) + if color == 'RED': + _, _, mean, count = dtObject.getRed() + elif color == 'BLUE': + _, _, mean, count = dtObject.getBlue() + elif color == 'GREEN': + c_mask, croped, mean, count = dtObject.getGreen() + + return self.checkThreshold(color, mean, count) + + def checkThreshold(self, color, mean, count): + min_count = int( + self.params.get('{}_min_count'.format(color), + self.__xmlObj.getKeyVal(self.__QVideoName, '{}_min_count'.format(color), 50000))) + + str = "" + # first verify count + if count >= min_count: + return True, "" + else: + str = "count" + return False, str + + def execute(self): + self.__resultlist = [] + + # Open camara + if not self.__Camara.Open(): + self.fail('Error: USB camera not found') + # discard initial frames + self.__drop_frames(self.__discard_frames_Count) + # Test + res = self.Verify_Color('RED') + if res != "ok": + self.fail("failed: RED verification failed") + res = self.Verify_Color('BLUE') + if res != "ok": + self.fail("failed: BLUE verification failed") + res = self.Verify_Color('GREEN') + if res != "ok": + self.fail("failed: GREEN verification failed") + + def getresults(self): + return self.__resultlist + + def gettextresult(self): + return "" diff --git a/test-cli/test/tests/qwifi.py b/test-cli/test/tests/qwifi.py index d08149b..4695041 100644 --- a/test-cli/test/tests/qwifi.py +++ b/test-cli/test/tests/qwifi.py @@ -1,45 +1,99 @@ -from test.helpers.syscmd import SysCommand import unittest -import subprocess +import time +import uuid +import iperf3 +import netifaces +from test.helpers.iw import iwScan +from test.helpers.utils import save_json_to_disk +from test.helpers.utils import station2Port class Qwifi(unittest.TestCase): + __serverip = None + __numbytestx = None + __bwexpected = None + __port = None + params = None + __bwreal = None + __resultlist = None # resultlist is a python list of python dictionaries + timebetweenattempts = 5 - def __init__(self, testname, testfunc, signal): + # varlist content: serverip, bwexpected, port + def __init__(self, testname, testfunc, varlist): + self.params = varlist super(Qwifi, self).__init__(testfunc) - self.__signal = signal self._testMethodDoc = testname - # WiFi SERVERIP fixed at the moment. - self._serverip = "192.168.5.1" + self.__xmlObj = varlist["xml"] + self.__QwifiName = varlist.get('name', 'qwifi') + self.__loops = varlist.get('loops', self.__xmlObj.getKeyVal(self.__QwifiName, "loops", "1")) + self.__port = varlist.get('port', self.__xmlObj.getKeyVal(self.__QwifiName, "port", "5000")) + self.__bw = varlist.get('bwexpected', self.__xmlObj.getKeyVal(self.__QwifiName, "bwexpected", "5.0")) + self.__serverip = varlist.get('serverip', self.__xmlObj.getKeyVal(self.__QwifiName, "serverip", "localhost")) + self.__duration = varlist.get('duration', self.__xmlObj.getKeyVal(self.__QwifiName, "duration", "10")) + self.__interface = varlist.get('interface', self.__xmlObj.getKeyVal(self.__QwifiName, "interface", "wlan0")) + self.__toPath = varlist.get('to', self.__xmlObj.getKeyVal(self.__QwifiName, "to", "/mnt/station_ramdisk")) + self.__retriesCount = varlist.get('retries', self.__xmlObj.getKeyVal(self.__QwifiName, "retries", "5")) + self.__waitRetryTime = varlist.get('wait_retry', self.__xmlObj.getKeyVal(self.__QwifiName, "wait_retry", "10")) + self.__wifi_res_file = varlist.get('wifi_res_file', self.__xmlObj.getKeyVal(self.__QwifiName, "wifi_res_file", "wifi_test_{}.json")) + self.__wifi_st_file = varlist.get('wifi_st_file', self.__xmlObj.getKeyVal(self.__QwifiName, "wifi_st_file", "wifi_st_{}.json")) + self.__file_uuid = uuid.uuid4() + self.__wifi_res_file = self.__wifi_res_file.format(self.__file_uuid) + self.__wifi_st_file = self.__wifi_st_file.format(self.__file_uuid) + self.__resultlist = [] + + def checkInterface(self, reqInterface): + ifaces = netifaces.interfaces() + for t in ifaces: + if t == reqInterface: + return True + return False def execute(self): - # First check connection with the wifi server using ping command - #str_cmd = "ping -c 1 {} > /dev/null".format(self._serverip) - #wifi_ping = SysCommand("wifi_ping", str_cmd) - #wifi_ping.execute() - #res = subprocess.call(['ping', '-c', '1', self._serverip]) - p = subprocess.Popen(['ping','-c','1',self._serverip,'-W','2'],stdout=subprocess.PIPE) - p.wait() - res=p.poll() - if res == 0: - str_cmd= "iw wlan0 link" - wifi_stats = SysCommand("wifi_stats", str_cmd) - if wifi_stats.execute() == 0: - w_stats = wifi_stats.getOutput().decode('ascii').splitlines() - #self._longMessage = str(self.__raw_out).replace("'", "") - if not w_stats[0] == "Not connected.": - signal_st = w_stats[5].split(" ")[1] - try: - int(signal_st) - if -1*int(signal_st) > int(self.__signal): - self.fail("failed: signal to server lower than expected") - except ValueError: - self.fail("failed: error output (Bad connection?)") + self.__resultlist = [] + stationList = {} + # check interfaces + if not self.checkInterface(self.__interface): + self.fail('Interface not found: {}'.format(self.__interface)) + # scan networks + scanner = iwScan(self.__interface) + if scanner.scan(): + stationList = scanner.getStations() + # check if we are connected + if not scanner.findConnected(): + self.fail('Not connected to test AP') + else: + strError = scanner.getLastError() + self.fail('qWifi scanner Execution error: {}'.format(strError)) + # Start Iperf + client = iperf3.Client() + client.duration = int(self.__duration) + client.server_hostname = self.__serverip + client.port = station2Port(self.__port) + count = int(self.__retriesCount) + while count > 0: + result = client.run() + if result.error == None: + if result.received_Mbps >= float(self.__bw) and result.sent_Mbps >= float(self.__bw): + save_json_to_disk(filePath='{}/{}'.format(self.__toPath, self.__wifi_st_file), + description='wifi scan', + mime='application/json', + json_data=stationList, + result=self.__resultlist) + save_json_to_disk(filePath='{}/{}'.format(self.__toPath, self.__wifi_res_file), + description='wifi {} iperf3'.format(self.__interface), + mime='application/json', + json_data=result.json, + result=self.__resultlist) + return else: - self.fail("failed: error output (Bad connection?)") - #tx_brate = float(w_stats[6].split(" ")[2]) + self.fail('bw fail: rx:{} tx:{}'.format(result.received_Mbps, result.sent_Mbps)) else: - self.fail("failed: couldn't execute iw command") - else: - self.fail("failed: ping to server {}".format(self._serverip)) + count = count - 1 + time.sleep(int(self.__waitRetryTime)) + + self.fail('qWifi (max tries) Execution error: {}'.format(result.error)) + def getresults(self): + return self.__resultlist + def gettextresult(self): + return "" diff --git a/test-cli/test_main.py b/test-cli/test_main.py index 3c4d1cb..1f5d1e4 100644 --- a/test-cli/test_main.py +++ b/test-cli/test_main.py @@ -1,118 +1,431 @@ -from test.helpers.get_dieid import genDieid -from subprocess import call -import xml.etree.ElementTree as XMLParser -import errno -import sys +import time +import sh import os import unittest +import socket +import logging +import sys +import getopt +import random + +from datetime import datetime +from test.helpers.int_registers import get_die_id +from test.helpers.int_registers import get_internal_mac +from subprocess import call from test.helpers.testsrv_db import TestSrv_Database +from test.helpers.setup_xml import XMLSetup from test.runners.simple import SimpleTestRunner -from test.tests.qbutton import Qbutton -from test.helpers.syscmd import TestSysCommand -from test.helpers.syscmd import SysCommand -from test.tests.qiperf import QIperf from test.tests.qethernet import Qethernet -from test.tests.qaudio import Qaudio from test.tests.qram import Qram from test.tests.qusb import Qusb -from test.tests.qi2c import Qi2c from test.tests.qeeprom import Qeeprom from test.tests.qserial import Qserial -from test.tests.qscreen import Qscreen from test.tests.qwifi import Qwifi from test.tests.qrtc import Qrtc from test.tests.qduplex_ser import Qduplex -from test.tests.qamp import Qamp -from test.tests.qflash import Qflasher -from test.helpers.finisher import Finisher +from test.tests.qamper import Qamper +from test.tests.qnand import Qnand +from test.tests.qaudio import Qaudio +from test.tests.qdmesg import Qdmesg +from test.tests.qmmcflash import Qmmcflash +from test.tests.qplc import Qplc +from test.tests.qvideo import Qvideo from test.helpers.globalVariables import globalVar +from test.tasks.flasheeprom import EEprom_Flasher +from test.tasks.flashmemory import NAND_Flasher +from test.helpers.qrreader import QRReader +from test.helpers.cmdline import LinuxKernelCmd +from test.helpers.amper import Amper +from test.enums.StationStates import StationStates +import xml.etree.ElementTree as XMLParser +from test.helpers.iseelogger import logObj +from test.helpers.utils import str2bool + +# global variables +psdbObj = TestSrv_Database() +xmlObj = None +test_abspath = None +tests_manually_executed = [] +AutoTest = False +OverrideAutoReboot = False +delay_reboot = '0' + # define clear function def clear(): # check and make call for specific operating system - _ = call('clear' if os.name =='posix' else 'cls') + _ = call('clear' if os.name == 'posix' else 'cls') +def reboot_board(): + logObj.getlogger().debug("#reboot board#") + if not OverrideAutoReboot: + set_next_state(StationStates.STATION_WAIT_SHUTDOWN.name) + if delay_reboot == '0': + time.sleep(5) + sh.shutdown('-r', '+{}'.format(delay_reboot), _bg=True) + # sh.reboot(_bg=True) + print("REBOOT...") + exit(0) -def create_board(): - psdb = TestSrv_Database() - psdb.open("setup.xml") - tree = XMLParser.parse('setup.xml') - root = tree.getroot() - suite = unittest.TestSuite() - for element in root.iter('board'): - # print(str(element.tag) + str(element.attrib)) - model_id = element.attrib['model'] - variant = element.attrib['variant'] - nstation = element.attrib['station'] - globalVar.g_mid=model_id + "-" + variant - globalVar.station=nstation - processor_id=genDieid(globalVar.g_mid) - print(globalVar.g_mid) - print(processor_id) - globalVar.g_uuid = psdb.create_board(processor_id, model_id, variant, bmac = None) - -def testsuite(): - psdb=TestSrv_Database() - psdb.open("setup.xml") - suite = unittest.TestSuite() - tests=psdb.getboard_comp_test_list(globalVar.g_uuid) - for i in range(len(tests)): - #newstr = oldstr.replace("M", "") - variables=str(tests[i][0]).split(",") - testname=variables[0].replace('(', '') - testdes=variables[1] - testfunc=variables[2] - if len(tests)>2: - testparam=variables[3].replace(')', '') - testparam = testparam.replace('"', '') - testparam = testparam.replace(';', "','") - if testparam == "": - command = "suite.addTest({}('{}','execute'))".format(testfunc, testname) +def wait_for_reboot(): + while True: + currentstate = psdbObj.read_station_state(globalVar.station) + if currentstate == StationStates.STATION_REBOOT.name: + reboot_board() + time.sleep(5) + +def execute_station_error(text): + logObj.getlogger().error("Station error: #{}#".format(text)) + wait_for_reboot() + +def check_first_state(): + currentstate = psdbObj.read_station_state(globalVar.station) + if currentstate == StationStates.STATION_ERROR.name or currentstate == StationStates.STATION_REBOOT.name: + logObj.getlogger().debug('station state during the first check is {}'.format(currentstate)) + wait_for_reboot() + else: + count_t = int(xmlObj.getKeyVal("test_main", "wait_test_start", "240")) + while currentstate != StationStates.WAIT_TEST_START.name: + logObj.getlogger().debug('wait for WAIT_TEST_START but read: {}'.format(currentstate)) + if count_t <= 0: + # Error + execute_station_error("Timeout while waiting for WAIT_TEST_START state actual STATE = {}".format(currentstate)) else: - command="suite.addTest({}('{}','execute','{}'))".format(testfunc,testname,testparam) + count_t = count_t - 5 + time.sleep(5) + currentstate = psdbObj.read_station_state(globalVar.station) + if currentstate == StationStates.STATION_ERROR.name or currentstate == StationStates.STATION_REBOOT.name: + wait_for_reboot() + + +def set_next_state(newstate): + statewritten = psdbObj.change_station_state(globalVar.station, newstate) + if statewritten == StationStates.STATION_REBOOT.name or statewritten == StationStates.STATION_ERROR.name: + wait_for_reboot() + +def reboot_if_autotest(): + # reset board if AUTOTEST is enabled + if AutoTest: + reboot_board() + + +def create_paramslist(params): + paramlist = {} + for varname, varvalue in params: + paramlist[varname] = varvalue + return paramlist + + +def add_test(suite, testdefname, paramlist): + if testdefname == "RAM": + suite.addTest(Qram(testdefname, "execute", paramlist)) + return 0 + elif testdefname == "SERIALDUAL": + suite.addTest(Qduplex(testdefname, "execute", paramlist)) + return 0 + elif testdefname == "EEPROM": + suite.addTest(Qeeprom(testdefname, "execute", paramlist)) + return 0 + elif testdefname == "SERIAL": + suite.addTest(Qserial(testdefname, "execute", paramlist)) + return 0 + elif testdefname == "RTC": + suite.addTest(Qrtc(testdefname, "execute", paramlist)) + return 0 + elif testdefname == "CONSUMPTION": + suite.addTest(Qamper(testdefname, "execute", paramlist)) + return 0 + elif testdefname == "ETHERNET": + suite.addTest(Qethernet(testdefname, "execute", paramlist)) + return 0 + elif testdefname == "NAND": + suite.addTest(Qnand(testdefname, "execute", paramlist)) + return 0 + elif testdefname == "WIFI": + suite.addTest(Qwifi(testdefname, "execute", paramlist)) + return 0 + elif testdefname == "USB": + suite.addTest(Qusb(testdefname, "execute", paramlist)) + return 0 + elif testdefname == "AUDIO": + suite.addTest(Qaudio(testdefname, "execute", paramlist)) + return 0 + elif testdefname == "DMESG": + tests_manually_executed.append(Qdmesg(testdefname, "execute", paramlist)) + return 0 + elif testdefname == "MMCFLASH": + suite.addTest(Qmmcflash(testdefname, "execute", paramlist)) + return 0 + elif testdefname == "PLC_MODEM": + suite.addTest(Qplc(testdefname, "execute", paramlist)) + return 0 + elif testdefname == "VIDEO": + suite.addTest(Qvideo(testdefname, "execute", paramlist)) + return 0 + else: + logObj.getlogger().error("Params for test: {} not implemented (ignore it), ".format(testdefname)) + return 1 + + +def create_testsuite(): + # create an object TestSuite + suite1 = unittest.TestSuite() + # get list of tests for this board + tests = psdbObj.get_tests_list(globalVar.g_uuid) + logObj.getlogger().debug("Create TestSuite: {}".format(tests)) + # loop in every test for this board + for testid, testdefname in tests: + # get params for this test + params = psdbObj.get_test_params_list(testid) + paramlist = create_paramslist(params) + # add the testid as parameter + paramlist["testid"] = testid + paramlist["boarduuid"] = globalVar.g_uuid + paramlist["testidctl"] = globalVar.testid_ctl + paramlist['testPath'] = test_abspath + logObj.getlogger().debug("Params for test: {}:{}-{}, ".format(testid, testdefname, paramlist)) + paramlist["xml"] = xmlObj + paramlist["db"] = psdbObj + # add test to TestSuite + add_test(suite1, testdefname, paramlist) + + return suite1 + + +def create_board(): + + cmd = LinuxKernelCmd() + model_id = cmd.getkvar("bmodel", "none") + variant = cmd.getkvar("bvariant", "none") + logObj.getlogger().debug("Kernel Command line vars: bmodel: {} bvariant: {}".format(model_id, variant)) + + if model_id == "none" or variant == "none": + # Error + execute_station_error("Cannot get model {} and variant {} information".format(model_id, variant)) + + # get model id + globalVar.g_mid = model_id + "-" + variant + logObj.getlogger().debug("Model-Variant -> {}".format(globalVar.g_mid)) + + # get station + if "Station" not in globalVar.station: + # Error + execute_station_error("Hostname not defined") + + processor_id = get_die_id(globalVar.g_mid) + globalVar.g_uuid = psdbObj.create_board(processor_id, model_id, variant, globalVar.station, + get_internal_mac(globalVar.g_mid)) + + logObj.getlogger().info("processor ID: {} uuid: {}".format(processor_id, globalVar.g_uuid)) + + +def get_taskvars_list(uuid): + varlist = {} + for varname, varvalue in psdbObj.get_task_variables_list(uuid): + varlist[varname] = varvalue + return varlist + +def execute_extra_task(): + # list of task + myTask = [] + # Change state to "EXTRATASKS_RUNNING" + set_next_state(StationStates.EXTRATASKS_RUNNING.name) + # create task control + globalVar.taskid_ctl = psdbObj.open_task(globalVar.g_uuid) + # get extra variables + varlist = get_taskvars_list(globalVar.g_uuid) + + # Put default Vars + varlist["boarduuid"] = globalVar.g_uuid + varlist["testidctl"] = globalVar.testid_ctl + varlist['testPath'] = test_abspath + varlist["xml"] = xmlObj + varlist["db"] = psdbObj + + # Get list Extra Task and activation + eeprom = varlist.get('eeprom', '0') + flashNand = varlist.get('flashNAND', '0') + + if eeprom == '1': + myEEprom = EEprom_Flasher(varlist) + myTask.append(myEEprom) + if flashNand == '1': + myNAND = NAND_Flasher(varlist) + myTask.append(myNAND) + + taskresult = True + for task in myTask: + res, data = task.Execute() + if res: + psdbObj.create_task_result(globalVar.taskid_ctl, task.getTaskName(), "TASK_OK", data) + logObj.getlogger().info("task: {} ok".format(task.getTaskName())) else: - print(testname) - command = "suite.addTest({}('{}','execute'))".format(testfunc, testname) - exec(command) - globalVar.testid_ctl=psdb.open_testbatch(globalVar.g_uuid) - return suite - - -def finish_test(): - psdb = TestSrv_Database() - psdb.open("setup.xml") - auxs = psdb.close_testbatch(globalVar.g_uuid, globalVar.testid_ctl) - globalVar.fstatus = auxs[0][0] - # Burn eeprom struct - psdb = TestSrv_Database() - psdb.open("setup.xml") - # We should call getboard_eeprom only if test was ok - if globalVar.fstatus: - aux = psdb.getboard_eeprom(globalVar.g_uuid) - finish = Finisher(aux) - finish.end_ok() + psdbObj.create_task_result(globalVar.taskid_ctl, task.getTaskName(), "TASK_FAIL", data) + logObj.getlogger().info("task: {} failed".format(task.getTaskName())) + taskresult = False + + # check final taskresult => ok or fail + if taskresult: + psdbObj.update_taskctl_status(globalVar.taskid_ctl, "TASK_BOARD_OK") else: - finish = Finisher(globalVar.g_uuid) - finish.end_fail() - # Update set_test current_test with 'END' so that it finally gets painted in green - psdb = TestSrv_Database() - psdb.open("setup.xml") - psdb.update_set_test_row(globalVar.station, globalVar.testid_ctl, globalVar.g_uuid, "END","FINISH") - -def main(): - #addtesttomodel() - #addtestdef() + psdbObj.update_taskctl_status(globalVar.taskid_ctl, "TASK_BOARD_FAIL") + for task in myTask: + if task.getError() != '': + execute_station_error("Unable to complete extra tasks: {}".format(task.getError())) + # Not emit a error message + execute_station_error("Unable to complete extra tasks") + +def getBarCode(fake): + + set_next_state(StationStates.WAITING_FOR_SCANNER.name) + if fake: + psdbObj.set_factorycode(globalVar.g_uuid, str(random.randint(1, 1000001))) + return + qrreceived = False + wait_count = psdbObj.get_setup_variable("test_main", xmlObj.getKeyVal("test_main", "qr_wait_time", "120")) + while not qrreceived or int(wait_count) > 0: + qr = QRReader() + if qr.openQR(): + # waits 5s to receive a valid code + if qr.readQRasync(10): + qrreceived = True + factorycode = qr.getQRNumber() + psdbObj.set_factorycode(globalVar.g_uuid, factorycode) + return + qr.closeQR() + else: + qrreceived = True + del qr + time.sleep(1) + wait_count = wait_count - 1 + if int(wait_count) == 0: + execute_station_error("Cannot find a barcode scanner") + + +def execute_test (): + # initialize the board create_board() - #globalVar.g_uuid = "1f59c654-0cc6-11e8-8d51-e644f56b8edd" - try: - os.remove("test_results.dat") - except: - pass - runner = SimpleTestRunner() - runner.run(testsuite()) - finish_test() + # create a process + globalVar.testid_ctl = psdbObj.open_test(globalVar.g_uuid) + logObj.getlogger().info("Create new Test: {}".format(globalVar.testid_ctl)) + + # Change state to "TESTS_RUNNING" + set_next_state(StationStates.TESTS_RUNNING.name) + + # generate suits + suite1 = create_testsuite() + # Execute tests (round 1) + runner1 = SimpleTestRunner(psdbObj) + logObj.getlogger().info("Runner run id: {}".format(globalVar.testid_ctl)) + testresult = runner1.run(suite1) + logObj.getlogger().info("Runner run id: {} finish {}".format(globalVar.testid_ctl, testresult)) + + # Execute manually tests + for test in tests_manually_executed: + logObj.getlogger().info("Execute manual test id: {}:{}".format(globalVar.testid_ctl, test.getTestName())) + mtestResult = test.execute() + logObj.getlogger().info("Execute manual test id: {}:{} Result: {}".format(globalVar.testid_ctl, test.getTestName(), + 'ok' if mtestResult else 'fail')) + + # execute aditional tasks, only if the test was successful + if testresult.wasSuccessful(): + logObj.getlogger().info("Execute test id: {} finished succesfully".format(globalVar.testid_ctl)) + set_next_state(StationStates.TESTS_OK.name) + logObj.getlogger().info("Execute test id: {} extra task".format(globalVar.testid_ctl)) + execute_extra_task() + logObj.getlogger().info("Execute test id: {} wait read scanner".format(globalVar.testid_ctl)) + getBarCode(AutoTest) + set_next_state(StationStates.FINISHED.name) + logObj.getlogger().info("Execute test id: {} wait read scanner finish".format(globalVar.testid_ctl)) + else: + logObj.getlogger().info("Execute test id: {} finished with one or more fails".format(globalVar.testid_ctl)) + # Change state to "TESTS_FAILED" + set_next_state(StationStates.TESTS_FAILED.name) + + # reset board if AUTOTEST is enabled + reboot_if_autotest() + + +def enableFaulthandler(): + """ Enable faulthandler for all threads. + + If the faulthandler package is available, this function disables and then + re-enables fault handling for all threads (this is necessary to ensure any + new threads are handled correctly), and returns True. + + If faulthandler is not available, then returns False. + """ + try: + import faulthandler + # necessary to disable first or else new threads may not be handled. + faulthandler.disable() + faulthandler.enable(all_threads=True) + return True + except ImportError: + return False if __name__ == "__main__": + # Clear the shell screen clear() - main() + + enableFaulthandler() + + test_abspath = os.path.dirname(os.path.abspath(__file__)) + configFile = 'setup.xml' + loglevel = 'INFO' + logObj.getlogger().info("Test program Started : {}".format(datetime.now())) + try: + argv = sys.argv[1:] + opts, args = getopt.getopt(argv, 'c:l:', ['cfg', 'loglevel']) + for opt, args in opts: + if opt in ('-c', '--setup'): + configFile = args + if opt in ('-l', '--loglevel'): + loglevel = args + except getopt.GetoptError as err: + logObj.getlogger().error('#{}#'.format(err)) + + if loglevel != 'INFO': + logObj.setLogLevel(loglevel) + + # Try to parse the setup.xml file + logObj.getlogger().debug("parse config file: {}". format(configFile)) + try: + xmlObj = XMLSetup(os.path.join(test_abspath, configFile)) + except XMLParser.ParseError as error: + # Error + execute_station_error("Error parsing {} file {}".format(configFile, error.msg)) + + # Try to connect to the DB, according to setup.xml configuration + if not psdbObj.open(xmlObj): + # Error + execute_station_error("Cannot open DB connection") + else: + logObj.getlogger().debug("database connection ok: {}".format(psdbObj.getConfig())) + + # get station name + globalVar.station = socket.gethostname() + logObj.getlogger().info("Test Station: {}".format(globalVar.station)) + + if psdbObj.get_setup_variable("AUTOTEST_" + globalVar.station, xmlObj.getKeyVal("test_main", "autotest", "0")) == '1': + AutoTest = True + + OverrideAutoReboot = str2bool(xmlObj.getKeyVal("test_main", "OverrideAutoReboot", "0")) + delay_reboot = xmlObj.getKeyVal("test_main", "delayReboot", "0") + + logObj.getlogger().info("AutoTest: {} OverrideAutoReboot: {} delay Reboot: {}".format(AutoTest, OverrideAutoReboot, delay_reboot)) + + # Check if current state is "WAIT_TEST_START". if not, waits here + if xmlObj.getKeyVal("test_main", "overrideCheckWaitStart", "0") == "0": + check_first_state() + else: + psdbObj.setDevelStationState(globalVar.station, "WAIT_TEST_START") + + # Change state to "TESTS_CHECKING_ENV" + set_next_state(StationStates.TESTS_CHECKING_ENV.name) + + # execute main program + execute_test() + + exit(0) |