summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorManel Caro <mcaro@iatec.biz>2021-11-06 16:28:38 +0100
committerManel Caro <mcaro@iatec.biz>2021-11-06 16:28:38 +0100
commitcf19bfe18cbd283b188a858ee1629f9909c924f4 (patch)
tree1efb23519727130058401df090ab1b5f4cc8ba99
parentb6932fbaf898724ae87c29f8965621610f377084 (diff)
parentd5b273a3b58a250742049df4ca0ef0ba54f53d33 (diff)
downloadboard-cf19bfe18cbd283b188a858ee1629f9909c924f4.zip
board-cf19bfe18cbd283b188a858ee1629f9909c924f4.tar.gz
board-cf19bfe18cbd283b188a858ee1629f9909c924f4.tar.bz2
Merge branch 'sopa-test'rel.0.1sopa-test
-rw-r--r--.idea/.gitignore8
-rw-r--r--.idea/board.iml8
-rw-r--r--.idea/deployment.xml23
-rw-r--r--.idea/inspectionProfiles/profiles_settings.xml6
-rw-r--r--.idea/misc.xml10
-rw-r--r--.idea/modules.xml8
-rw-r--r--.idea/sshConfigs.xml8
-rw-r--r--.idea/vcs.xml6
-rw-r--r--.idea/webServers.xml14
-rwxr-xr-xscripts/v4l-cam.sh14
-rw-r--r--test-cli/.idea/dictionaries/hfernandez.xml3
-rw-r--r--test-cli/.idea/inspectionProfiles/profiles_settings.xml7
-rw-r--r--test-cli/.idea/misc.xml7
-rw-r--r--test-cli/.idea/modules.xml8
-rw-r--r--test-cli/.idea/test-cli.iml12
-rw-r--r--test-cli/.idea/vcs.xml6
-rw-r--r--test-cli/.idea/workspace.xml120
-rw-r--r--test-cli/__init__.py (renamed from test-cli/test/suites/__init__.py)0
-rw-r--r--test-cli/setup.xml35
-rw-r--r--test-cli/test/__init__.pycbin114 -> 124 bytes
-rw-r--r--test-cli/test/enums/StationStates.py22
-rw-r--r--test-cli/test/enums/__init__.py (renamed from test-cli/test/helpers/uboot_flasher.py)0
-rw-r--r--test-cli/test/files/OpenSans-Regular.ttfbin0 -> 217360 bytes
-rw-r--r--test-cli/test/files/dtmf-13579.wavbin8045 -> 0 bytes
-rw-r--r--test-cli/test/files/test_ok.pngbin18150 -> 0 bytes
-rw-r--r--test-cli/test/files/test_pattern.pngbin4457 -> 0 bytes
-rw-r--r--test-cli/test/helpers/DiskHelpers.py9
-rw-r--r--test-cli/test/helpers/__init__.pycbin122 -> 132 bytes
-rw-r--r--test-cli/test/helpers/amper.py154
-rw-r--r--test-cli/test/helpers/button_script.sh4
-rw-r--r--test-cli/test/helpers/camara.py116
-rw-r--r--test-cli/test/helpers/changedir.py14
-rw-r--r--test-cli/test/helpers/cmdline.py28
-rw-r--r--test-cli/test/helpers/cv_display_test.py28
-rw-r--r--test-cli/test/helpers/detect.py61
-rw-r--r--test-cli/test/helpers/finisher.py151
-rw-r--r--test-cli/test/helpers/get_dieid.py43
-rw-r--r--test-cli/test/helpers/globalVariables.py3
-rw-r--r--test-cli/test/helpers/gpio.py55
-rw-r--r--test-cli/test/helpers/int_registers.py68
-rw-r--r--test-cli/test/helpers/iseelogger.py62
-rw-r--r--test-cli/test/helpers/iw.py124
-rw-r--r--test-cli/test/helpers/md5.py8
-rw-r--r--test-cli/test/helpers/plc.py82
-rw-r--r--test-cli/test/helpers/psqldb.py44
-rw-r--r--test-cli/test/helpers/qrreader.py123
-rw-r--r--test-cli/test/helpers/sdl.py126
-rw-r--r--test-cli/test/helpers/setup_xml.py50
-rw-r--r--test-cli/test/helpers/syscmd.py72
-rw-r--r--test-cli/test/helpers/testsrv_db.py262
-rw-r--r--test-cli/test/helpers/usb.py270
-rw-r--r--test-cli/test/helpers/utils.py68
-rw-r--r--test-cli/test/runners/simple.py80
-rw-r--r--test-cli/test/scripts/__init__.py0
-rwxr-xr-xtest-cli/test/scripts/usb.sh14
-rw-r--r--test-cli/test/tasks/__init__.py0
-rw-r--r--test-cli/test/tasks/flasheeprom.py77
-rw-r--r--test-cli/test/tasks/flashmemory.py44
-rw-r--r--test-cli/test/tests/qamp.py87
-rw-r--r--test-cli/test/tests/qamper.py74
-rw-r--r--test-cli/test/tests/qaudio.py119
-rw-r--r--test-cli/test/tests/qbutton.py54
-rw-r--r--test-cli/test/tests/qdmesg.py32
-rw-r--r--test-cli/test/tests/qduplex_ser.py69
-rw-r--r--test-cli/test/tests/qeeprom.py90
-rw-r--r--test-cli/test/tests/qethernet.py161
-rw-r--r--test-cli/test/tests/qflash.py77
-rw-r--r--test-cli/test/tests/qi2c.py29
-rw-r--r--test-cli/test/tests/qiperf.py53
-rw-r--r--test-cli/test/tests/qmmcflash.py55
-rw-r--r--test-cli/test/tests/qnand.py48
-rw-r--r--test-cli/test/tests/qplc.py121
-rw-r--r--test-cli/test/tests/qram.py68
-rw-r--r--test-cli/test/tests/qrtc.py44
-rw-r--r--test-cli/test/tests/qscreen.py31
-rw-r--r--test-cli/test/tests/qserial.py43
-rw-r--r--test-cli/test/tests/qtemplate.py51
-rw-r--r--test-cli/test/tests/qusb.py139
-rw-r--r--test-cli/test/tests/qvideo.py125
-rw-r--r--test-cli/test/tests/qwifi.py120
-rw-r--r--test-cli/test_main.py495
81 files changed, 3639 insertions, 1111 deletions
diff --git a/.idea/.gitignore b/.idea/.gitignore
new file mode 100644
index 0000000..73f69e0
--- /dev/null
+++ b/.idea/.gitignore
@@ -0,0 +1,8 @@
+# Default ignored files
+/shelf/
+/workspace.xml
+# Datasource local storage ignored files
+/dataSources/
+/dataSources.local.xml
+# Editor-based HTTP Client requests
+/httpRequests/
diff --git a/.idea/board.iml b/.idea/board.iml
new file mode 100644
index 0000000..0084ecc
--- /dev/null
+++ b/.idea/board.iml
@@ -0,0 +1,8 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<module type="PYTHON_MODULE" version="4">
+ <component name="NewModuleRootManager">
+ <content url="file://$MODULE_DIR$" />
+ <orderEntry type="jdk" jdkName="Remote Python 3.5.2 (sftp://root@192.168.70.103:22/usr/bin/python3)" jdkType="Python SDK" />
+ <orderEntry type="sourceFolder" forTests="false" />
+ </component>
+</module> \ No newline at end of file
diff --git a/.idea/deployment.xml b/.idea/deployment.xml
new file mode 100644
index 0000000..0f2a42d
--- /dev/null
+++ b/.idea/deployment.xml
@@ -0,0 +1,23 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project version="4">
+ <component name="PublishConfigData" autoUpload="Always" serverName="root@192.168.70.103:22">
+ <serverData>
+ <paths name="root@192.168.70.103:22">
+ <serverdata>
+ <mappings>
+ <mapping deploy="/root/hwtest-files/board" local="$PROJECT_DIR$" />
+ </mappings>
+ </serverdata>
+ </paths>
+ <paths name="station03">
+ <serverdata>
+ <mappings>
+ <mapping deploy="/root/hwtest-files/board" local="$PROJECT_DIR$" web="/" />
+ <mapping local="" />
+ </mappings>
+ </serverdata>
+ </paths>
+ </serverData>
+ <option name="myAutoUpload" value="ALWAYS" />
+ </component>
+</project> \ No newline at end of file
diff --git a/.idea/inspectionProfiles/profiles_settings.xml b/.idea/inspectionProfiles/profiles_settings.xml
new file mode 100644
index 0000000..105ce2d
--- /dev/null
+++ b/.idea/inspectionProfiles/profiles_settings.xml
@@ -0,0 +1,6 @@
+<component name="InspectionProjectProfileManager">
+ <settings>
+ <option name="USE_PROJECT_PROFILE" value="false" />
+ <version value="1.0" />
+ </settings>
+</component> \ No newline at end of file
diff --git a/.idea/misc.xml b/.idea/misc.xml
new file mode 100644
index 0000000..73e130a
--- /dev/null
+++ b/.idea/misc.xml
@@ -0,0 +1,10 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project version="4">
+ <component name="JavaScriptSettings">
+ <option name="languageLevel" value="ES6" />
+ </component>
+ <component name="ProjectRootManager" version="2" project-jdk-name="Remote Python 3.5.2 (sftp://root@192.168.70.103:22/usr/bin/python3)" project-jdk-type="Python SDK" />
+ <component name="PyPackaging">
+ <option name="earlyReleasesAsUpgrades" value="true" />
+ </component>
+</project> \ No newline at end of file
diff --git a/.idea/modules.xml b/.idea/modules.xml
new file mode 100644
index 0000000..04147ee
--- /dev/null
+++ b/.idea/modules.xml
@@ -0,0 +1,8 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project version="4">
+ <component name="ProjectModuleManager">
+ <modules>
+ <module fileurl="file://$PROJECT_DIR$/.idea/board.iml" filepath="$PROJECT_DIR$/.idea/board.iml" />
+ </modules>
+ </component>
+</project> \ No newline at end of file
diff --git a/.idea/sshConfigs.xml b/.idea/sshConfigs.xml
new file mode 100644
index 0000000..786082f
--- /dev/null
+++ b/.idea/sshConfigs.xml
@@ -0,0 +1,8 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project version="4">
+ <component name="SshConfigs">
+ <configs>
+ <sshConfig authType="PASSWORD" host="192.168.70.103" id="472af0db-b796-4ceb-b3b2-0756d856cbf6" port="22" nameFormat="DESCRIPTIVE" username="root" />
+ </configs>
+ </component>
+</project> \ No newline at end of file
diff --git a/.idea/vcs.xml b/.idea/vcs.xml
new file mode 100644
index 0000000..94a25f7
--- /dev/null
+++ b/.idea/vcs.xml
@@ -0,0 +1,6 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project version="4">
+ <component name="VcsDirectoryMappings">
+ <mapping directory="$PROJECT_DIR$" vcs="Git" />
+ </component>
+</project> \ No newline at end of file
diff --git a/.idea/webServers.xml b/.idea/webServers.xml
new file mode 100644
index 0000000..ae7016a
--- /dev/null
+++ b/.idea/webServers.xml
@@ -0,0 +1,14 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project version="4">
+ <component name="WebServers">
+ <option name="servers">
+ <webServer id="9c70e797-df67-4684-951a-436bc6a0c92b" name="station03">
+ <fileTransfer rootFolder="/root/" accessType="SFTP" host="192.168.70.103" port="22" sshConfigId="472af0db-b796-4ceb-b3b2-0756d856cbf6" sshConfig="root@192.168.70.103:22 password">
+ <advancedOptions>
+ <advancedOptions dataProtectionLevel="Private" passiveMode="true" shareSSLContext="true" />
+ </advancedOptions>
+ </fileTransfer>
+ </webServer>
+ </option>
+ </component>
+</project> \ No newline at end of file
diff --git a/scripts/v4l-cam.sh b/scripts/v4l-cam.sh
new file mode 100755
index 0000000..5501fec
--- /dev/null
+++ b/scripts/v4l-cam.sh
@@ -0,0 +1,14 @@
+#!/bin/bash
+
+#v4l2-ctl -d /dev/video0 -c exposure_auto=3
+v4l2-ctl -d /dev/video0 -c brightness=0
+#v4l2-ctl -d /dev/video0 -c contrast=20
+#v4l2-ctl -d /dev/video0 -c saturation=55
+#v4l2-ctl -d /dev/video0 -c hue=0
+#v4l2-ctl -d /dev/video0 -c white_balance_temperature_auto=0
+#v4l2-ctl -d /dev/video0 -c gamma=100
+#v4l2-ctl -d /dev/video0 -c power_line_frequency=1
+#v4l2-ctl -d /dev/video0 -c white_balance_temperature=4500
+#v4l2-ctl -d /dev/video0 -c sharpness=2
+#v4l2-ctl -d /dev/video0 -c backlight_compensation=2
+# v4l2-ctl -d /dev/video0 -c exposure_absolute=166
diff --git a/test-cli/.idea/dictionaries/hfernandez.xml b/test-cli/.idea/dictionaries/hfernandez.xml
new file mode 100644
index 0000000..c85feeb
--- /dev/null
+++ b/test-cli/.idea/dictionaries/hfernandez.xml
@@ -0,0 +1,3 @@
+<component name="ProjectDictionaryState">
+ <dictionary name="hfernandez" />
+</component> \ No newline at end of file
diff --git a/test-cli/.idea/inspectionProfiles/profiles_settings.xml b/test-cli/.idea/inspectionProfiles/profiles_settings.xml
new file mode 100644
index 0000000..dd4c951
--- /dev/null
+++ b/test-cli/.idea/inspectionProfiles/profiles_settings.xml
@@ -0,0 +1,7 @@
+<component name="InspectionProjectProfileManager">
+ <settings>
+ <option name="PROJECT_PROFILE" value="Default" />
+ <option name="USE_PROJECT_PROFILE" value="false" />
+ <version value="1.0" />
+ </settings>
+</component> \ No newline at end of file
diff --git a/test-cli/.idea/misc.xml b/test-cli/.idea/misc.xml
new file mode 100644
index 0000000..3999087
--- /dev/null
+++ b/test-cli/.idea/misc.xml
@@ -0,0 +1,7 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project version="4">
+ <component name="JavaScriptSettings">
+ <option name="languageLevel" value="ES6" />
+ </component>
+ <component name="ProjectRootManager" version="2" project-jdk-name="Python 3.6" project-jdk-type="Python SDK" />
+</project> \ No newline at end of file
diff --git a/test-cli/.idea/modules.xml b/test-cli/.idea/modules.xml
new file mode 100644
index 0000000..fc2e93a
--- /dev/null
+++ b/test-cli/.idea/modules.xml
@@ -0,0 +1,8 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project version="4">
+ <component name="ProjectModuleManager">
+ <modules>
+ <module fileurl="file://$PROJECT_DIR$/.idea/test-cli.iml" filepath="$PROJECT_DIR$/.idea/test-cli.iml" />
+ </modules>
+ </component>
+</project> \ No newline at end of file
diff --git a/test-cli/.idea/test-cli.iml b/test-cli/.idea/test-cli.iml
new file mode 100644
index 0000000..218fb9e
--- /dev/null
+++ b/test-cli/.idea/test-cli.iml
@@ -0,0 +1,12 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<module type="PYTHON_MODULE" version="4">
+ <component name="NewModuleRootManager">
+ <content url="file://$MODULE_DIR$" />
+ <orderEntry type="jdk" jdkName="Python 3.6" jdkType="Python SDK" />
+ <orderEntry type="sourceFolder" forTests="false" />
+ </component>
+ <component name="PyDocumentationSettings">
+ <option name="format" value="GOOGLE" />
+ <option name="myDocStringFormat" value="Google" />
+ </component>
+</module> \ No newline at end of file
diff --git a/test-cli/.idea/vcs.xml b/test-cli/.idea/vcs.xml
new file mode 100644
index 0000000..6c0b863
--- /dev/null
+++ b/test-cli/.idea/vcs.xml
@@ -0,0 +1,6 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project version="4">
+ <component name="VcsDirectoryMappings">
+ <mapping directory="$PROJECT_DIR$/.." vcs="Git" />
+ </component>
+</project> \ No newline at end of file
diff --git a/test-cli/.idea/workspace.xml b/test-cli/.idea/workspace.xml
new file mode 100644
index 0000000..1d513ac
--- /dev/null
+++ b/test-cli/.idea/workspace.xml
@@ -0,0 +1,120 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project version="4">
+ <component name="ChangeListManager">
+ <list default="true" id="4991a6e0-1b9d-4824-9b6e-5ac031eb4816" name="Default Changelist" comment="">
+ <change beforePath="$PROJECT_DIR$/.idea/workspace.xml" beforeDir="false" afterPath="$PROJECT_DIR$/.idea/workspace.xml" afterDir="false" />
+ <change beforePath="$PROJECT_DIR$/test/tests/qusbdual.py" beforeDir="false" afterPath="$PROJECT_DIR$/test/tests/qusbdual.py" afterDir="false" />
+ <change beforePath="$PROJECT_DIR$/test_main.py" beforeDir="false" afterPath="$PROJECT_DIR$/test_main.py" afterDir="false" />
+ </list>
+ <option name="SHOW_DIALOG" value="false" />
+ <option name="HIGHLIGHT_CONFLICTS" value="true" />
+ <option name="HIGHLIGHT_NON_ACTIVE_CHANGELIST" value="false" />
+ <option name="LAST_RESOLUTION" value="IGNORE" />
+ </component>
+ <component name="FileTemplateManagerImpl">
+ <option name="RECENT_TEMPLATES">
+ <list>
+ <option value="Python Script" />
+ </list>
+ </option>
+ </component>
+ <component name="Git.Settings">
+ <option name="RECENT_GIT_ROOT_PATH" value="$PROJECT_DIR$/.." />
+ </component>
+ <component name="ProjectId" id="1YfjUHHfmr4Muw0TDMT8ItsS6oU" />
+ <component name="ProjectLevelVcsManager" settingsEditedManually="true">
+ <ConfirmationsSetting value="2" id="Add" />
+ </component>
+ <component name="ProjectViewState">
+ <option name="hideEmptyMiddlePackages" value="true" />
+ <option name="showLibraryContents" value="true" />
+ </component>
+ <component name="PropertiesComponent">
+ <property name="ASKED_ADD_EXTERNAL_FILES" value="true" />
+ <property name="RunOnceActivity.OpenProjectViewOnStart" value="true" />
+ <property name="RunOnceActivity.ShowReadmeOnStart" value="true" />
+ <property name="SHARE_PROJECT_CONFIGURATION_FILES" value="true" />
+ <property name="WebServerToolWindowFactoryState" value="false" />
+ <property name="last_opened_file_path" value="$PROJECT_DIR$/test/tests" />
+ <property name="settings.editor.selected.configurable" value="configurable.group.appearance" />
+ </component>
+ <component name="RecentsManager">
+ <key name="CopyFile.RECENT_KEYS">
+ <recent name="$PROJECT_DIR$/test/tests" />
+ <recent name="$PROJECT_DIR$/test/tasks" />
+ <recent name="$PROJECT_DIR$/test/helpers" />
+ </key>
+ <key name="MoveFile.RECENT_KEYS">
+ <recent name="$PROJECT_DIR$/test/scripts" />
+ </key>
+ </component>
+ <component name="RunManager">
+ <configuration name="test" type="PythonConfigurationType" factoryName="Python">
+ <module name="test-cli" />
+ <option name="INTERPRETER_OPTIONS" value="" />
+ <option name="PARENT_ENVS" value="true" />
+ <envs>
+ <env name="PYTHONUNBUFFERED" value="1" />
+ </envs>
+ <option name="SDK_HOME" value="/usr/bin/python3.6" />
+ <option name="WORKING_DIRECTORY" value="$PROJECT_DIR$" />
+ <option name="IS_MODULE_SDK" value="false" />
+ <option name="ADD_CONTENT_ROOTS" value="true" />
+ <option name="ADD_SOURCE_ROOTS" value="true" />
+ <option name="SCRIPT_NAME" value="$PROJECT_DIR$/test_main.py" />
+ <option name="PARAMETERS" value="" />
+ <option name="SHOW_COMMAND_LINE" value="false" />
+ <option name="EMULATE_TERMINAL" value="false" />
+ <option name="MODULE_MODE" value="false" />
+ <option name="REDIRECT_INPUT" value="false" />
+ <option name="INPUT_FILE" value="" />
+ <method v="2" />
+ </configuration>
+ </component>
+ <component name="SvnConfiguration">
+ <configuration />
+ </component>
+ <component name="TaskManager">
+ <task active="true" id="Default" summary="Default task">
+ <changelist id="4991a6e0-1b9d-4824-9b6e-5ac031eb4816" name="Default Changelist" comment="" />
+ <created>1583340556211</created>
+ <option name="number" value="Default" />
+ <option name="presentableId" value="Default" />
+ <updated>1583340556211</updated>
+ <workItem from="1583340557592" duration="4841000" />
+ </task>
+ <servers />
+ </component>
+ <component name="TypeScriptGeneratedFilesManager">
+ <option name="version" value="1" />
+ </component>
+ <component name="VcsManagerConfiguration">
+ <option name="ADD_EXTERNAL_FILES_SILENTLY" value="true" />
+ </component>
+ <component name="WindowStateProjectService">
+ <state x="668" y="354" width="640" height="396" key="#com.intellij.fileTypes.FileTypeChooser" timestamp="1593588611185">
+ <screen x="67" y="27" width="1853" height="1053" />
+ </state>
+ <state x="668" y="354" width="640" height="396" key="#com.intellij.fileTypes.FileTypeChooser/67.27.1853.1053/1920.0.1920.1080@67.27.1853.1053" timestamp="1593588611185" />
+ <state x="726" y="301" width="524" height="502" key="#com.intellij.refactoring.safeDelete.UnsafeUsagesDialog" timestamp="1593533225414">
+ <screen x="67" y="27" width="1853" height="1053" />
+ </state>
+ <state x="726" y="301" width="524" height="502" key="#com.intellij.refactoring.safeDelete.UnsafeUsagesDialog/67.27.1853.1053/1920.0.1920.1080@67.27.1853.1053" timestamp="1593533225414" />
+ <state x="482" y="187" key="SettingsEditor" timestamp="1594023927905">
+ <screen x="67" y="27" width="1853" height="1053" />
+ </state>
+ <state x="482" y="187" key="SettingsEditor/67.27.1853.1053/1920.0.1920.1080@67.27.1853.1053" timestamp="1594023927905" />
+ <state width="498" height="446" key="SwitcherDM" timestamp="1594047303932">
+ <screen x="67" y="27" width="1853" height="1053" />
+ </state>
+ <state width="498" height="446" key="SwitcherDM/67.27.1853.1053/1920.0.1920.1080@67.27.1853.1053" timestamp="1594047303932" />
+ <state x="607" y="223" width="762" height="658" key="com.intellij.openapi.editor.actions.MultiplePasteAction$ClipboardContentChooser" timestamp="1593071903123">
+ <screen x="67" y="27" width="1853" height="1053" />
+ </state>
+ <state x="607" y="223" width="762" height="658" key="com.intellij.openapi.editor.actions.MultiplePasteAction$ClipboardContentChooser/67.27.1853.1053/1920.0.1920.1080@67.27.1853.1053" timestamp="1593071903123" />
+ <state x="657" y="198" width="670" height="676" key="search.everywhere.popup" timestamp="1593083588598">
+ <screen x="67" y="27" width="1853" height="1053" />
+ </state>
+ <state x="657" y="198" width="670" height="676" key="search.everywhere.popup/67.27.1853.1053/1920.0.1920.1080@67.27.1853.1053" timestamp="1593083588598" />
+ </component>
+</project> \ No newline at end of file
diff --git a/test-cli/test/suites/__init__.py b/test-cli/__init__.py
index e69de29..e69de29 100644
--- a/test-cli/test/suites/__init__.py
+++ b/test-cli/__init__.py
diff --git a/test-cli/setup.xml b/test-cli/setup.xml
index 41295b5..063d3e1 100644
--- a/test-cli/setup.xml
+++ b/test-cli/setup.xml
@@ -1,9 +1,38 @@
<?xml version="1.0"?>
<data>
<setup>
- <test idline="1"/> <!-- Test line identify -->
- <board model="IGEP0000" variant="TEST-TEST" station="1"/>
- <db dbname="testsrv" type="PgSQLConnection" host="192.168.2.79" port="5432" user="admin" password="Idkfa2009" /> <!-- database setup -->
+ <test_main qr_wait_time="240" autotest="0" OverrideAutoReboot="1" delayReboot="1" wait_test_start="240" overrideCheckWaitStart="1"/>
+ <db dbname="testsrv" type="PgSQLConnection" host="dbtest01.isee.biz" port="5432" user="admin" password="Idkfa2009" />
+ <qaudio play_dtmf_file="/root/hwtest-files/dtmf-13579.wav" to="/mnt/station_ramdisk" record_dtmf_file="dtmf_recorded_{}.wav" recordtime="2.00" rate="8000" aplay_run_delay="0.1"/>
+ <qusb loops="3" pathfrom="/root/hwtest-files" file="usb-test.bin" file_md5="usb-test.bin.md5" pathto="/mnt/test/disk2"/>
+ <qusb2 loops="3" pathfrom="/mnt/test/disk1" file="usb-test.bin" file_md5="usb-test.bin.md5" pathto="/nfs/station/tmp"/>
+ <qdmesg syslog_dmesg="/var/log/kern.log"/>
+ <qram loops="1" memsize="100M" memtesterPath="/root/hwtest-files/memtester" to="/mnt/station_ramdisk" ram_res_file="ram_res.txt"/>
+ <qnand nandtestPath="/root/hwtest-files/nandtest"/>
+ <NAND_Flasher name="NAND_Flasher" igep_flash="/usr/bin/igep-flash" skipnandtest="1"/>
+ <EEProm_Flasher name="EEPROM_Flasher"/>
+ <qwifi loops="1" interface="wlan0" bwexpected="5.0" port="5000" serverip="192.168.5.4" duration="10" to="/mnt/station_ramdisk" wifi_res_file="eth_test_{}.json" wifi_st_file="eth_st_{}.json" retries="5" wait_retry="10"/>
+ <qeth loops="1" interface="eth0" bwexpected="5.0" port="5000" serverip="192.168.60.3" duration="10" to="/mnt/station_ramdisk" eth_res_file="eth_test_{}.json" eth_st_file="eth_st_{}.json" retries="5" wait_retry="10"/>
+ <qeeprom i2c_address="0050" i2c_bus="0"/>
+ <qemmc device="mmcblk0" to="/mnt/station_ramdisk" emmc_res_file="emmc_status.txt"/>
+ <qplc firmwarepath="/root/hwtest-files/firmware" firmware="" firmware_md5=""
+ factory_tool="/root/hwtest-files/apps/configlayer" gen_ip="0" gen_mac="1"
+ mtd_device="/dev/mtd0" factory_password="betera" firmware_password="paterna"
+ skipflash="0"
+ />
+ <qvideo sdl_driver="x11" sdl_display=":0" capture_size_w="1280" capture_size_h="1024" camdevice="video0"
+ cam_setupfile="/root/hwtest-files/board/scripts/v4l-cam.sh"
+ capture_discardframes="3" capture_framemean="3" capture_maxfailed="1"
+ RED_min_count="50000" BLUE_min_count="50000" GREEN_min_count="50000"
+ />
</setup>
+ <usb>
+ <usbdev MANUFACTURER="Linux 4.9.81+ ohci_hcd" PRODUCT="OHCI Host Controller" CLASS="09" VENDOR="1d6b" PRODID="0001" ignore="1"/>
+ <usbdev MANUFACTURER="Linux 4.9.81+ musb-hcd" PRODUCT="MUSB HDRC host driver" CLASS="09" VENDOR="1d6b" PRODID="0002" ignore="1"/>
+ <usbdev MANUFACTURER="Linux 4.9.81+ ehci_hcd" PRODUCT="EHCI Host Controller" CLASS="09" VENDOR="1d6b" PRODID="0002" ignore="1"/>
+ <usbdev MANUFACTURER="Linux 4.15.0-50-generic ehci_hcd" PRODUCT="EHCI Host Controller" CLASS="09" VENDOR="1d6b" PRODID="0002" ignore="1"/>
+ <usbdev MANUFACTURER="Linux 4.15.0-50-generic uhci_hcd" PRODUCT="UHCI Host Controller" CLASS="09" VENDOR="1d6b" PRODID="0001" ignore="1"/>
+ <usbdev MANUFACTURER="Linux 4.9.185+ musb-hcd" PRODUCT="MUSB HDRC host driver" CLASS="09" VENDOR="1d6b" PRODID="0002" ignore="1"/>
+ </usb>
</data>
diff --git a/test-cli/test/__init__.pyc b/test-cli/test/__init__.pyc
index cd50092..3248efc 100644
--- a/test-cli/test/__init__.pyc
+++ b/test-cli/test/__init__.pyc
Binary files differ
diff --git a/test-cli/test/enums/StationStates.py b/test-cli/test/enums/StationStates.py
new file mode 100644
index 0000000..88dba1f
--- /dev/null
+++ b/test-cli/test/enums/StationStates.py
@@ -0,0 +1,22 @@
+from enum import Enum, unique
+
+
+@unique
+class StationStates(Enum):
+ UNDEFINED = 1000
+ FTP_KERNEL = 1
+ FTP_DTB = 2
+ MOUNT_NFS = 3
+ KERNEL_BOOT = 4
+ WAIT_TEST_START = 5
+ STATION_ERROR = 2000
+ TESTS_CHECKING_ENV = 6
+ TESTS_RUNNING = 10
+ TESTS_FAILED = 40
+ TESTS_OK = 30
+ EXTRATASKS_RUNNING = 100
+ WAITING_FOR_SCANNER = 50
+ FINISHED = 60
+ STATION_REBOOT = 2001
+ STATION_WAIT_SHUTDOWN = 2002
+
diff --git a/test-cli/test/helpers/uboot_flasher.py b/test-cli/test/enums/__init__.py
index e69de29..e69de29 100644
--- a/test-cli/test/helpers/uboot_flasher.py
+++ b/test-cli/test/enums/__init__.py
diff --git a/test-cli/test/files/OpenSans-Regular.ttf b/test-cli/test/files/OpenSans-Regular.ttf
new file mode 100644
index 0000000..db43334
--- /dev/null
+++ b/test-cli/test/files/OpenSans-Regular.ttf
Binary files differ
diff --git a/test-cli/test/files/dtmf-13579.wav b/test-cli/test/files/dtmf-13579.wav
deleted file mode 100644
index 1ca5b93..0000000
--- a/test-cli/test/files/dtmf-13579.wav
+++ /dev/null
Binary files differ
diff --git a/test-cli/test/files/test_ok.png b/test-cli/test/files/test_ok.png
deleted file mode 100644
index 6ccc7b3..0000000
--- a/test-cli/test/files/test_ok.png
+++ /dev/null
Binary files differ
diff --git a/test-cli/test/files/test_pattern.png b/test-cli/test/files/test_pattern.png
deleted file mode 100644
index 353aab5..0000000
--- a/test-cli/test/files/test_pattern.png
+++ /dev/null
Binary files differ
diff --git a/test-cli/test/helpers/DiskHelpers.py b/test-cli/test/helpers/DiskHelpers.py
new file mode 100644
index 0000000..79b661e
--- /dev/null
+++ b/test-cli/test/helpers/DiskHelpers.py
@@ -0,0 +1,9 @@
+import stat
+import os
+
+
+def disk_exists(path):
+ try:
+ return stat.S_ISBLK(os.stat(path).st_mode)
+ except:
+ return False
diff --git a/test-cli/test/helpers/__init__.pyc b/test-cli/test/helpers/__init__.pyc
index 7d1c907..01014e2 100644
--- a/test-cli/test/helpers/__init__.pyc
+++ b/test-cli/test/helpers/__init__.pyc
Binary files differ
diff --git a/test-cli/test/helpers/amper.py b/test-cli/test/helpers/amper.py
new file mode 100644
index 0000000..ea719f6
--- /dev/null
+++ b/test-cli/test/helpers/amper.py
@@ -0,0 +1,154 @@
+import serial
+import scanf
+
+
+class Amper(object):
+ __ser = None
+ __port = '/dev/ttyUSB0'
+ __speed = 115200
+ __parity = None
+ __rtscts = 0
+ __timeout = 1
+ __isThere = False
+ __version = None
+ __voltage = 0.0
+ __current = 0.0
+ __alarm_condition = 0
+
+ def __init__(self, port='/dev/ttyUSB0', serial_speed=115200, parity=serial.PARITY_NONE, rtscts=0):
+ self.__port = port
+ self.__speed = serial_speed
+ self.__parity = parity
+ self.__rtscts = rtscts
+
+ def open(self):
+ try:
+ if self.__ser is not None:
+ self.close()
+ self.__ser = serial.Serial(port=self.__port,
+ baudrate=self.__speed,
+ timeout=self.__timeout,
+ parity=self.__parity,
+ rtscts=self.__rtscts,
+ bytesize=serial.EIGHTBITS,
+ stopbits=serial.STOPBITS_ONE,
+ xonxoff=0)
+
+ self.__ser.flushInput()
+ self.__ser.flushOutput()
+ self.__ser.break_condition = False
+ return True
+ except serial.SerialException as err:
+ print(err)
+ return False
+
+ def close(self):
+ if self.__ser is not None:
+ self.__ser.close()
+ self.__ser = None
+ self.__isThere = False
+ self.__version = None
+ self.__voltage = 0.0
+ self.__current = 0.0
+ self.__alarm_condition = 0
+
+ def __write(self, data):
+ data += "\n\r"
+ dat = data.encode('ascii')
+ self.__ser.write(dat)
+
+ def __read(self):
+ return self.__ser.readline().decode().rstrip()
+
+ def __set_voltage_underlimit(self, under_limit):
+ if self.__isThere and self.__ser is not None:
+ self.__write('AT+VIN_UV_W_LIM={}'.format(under_limit))
+ dat = self.__read()
+ print(dat)
+
+ def __get_voltage_underlimit(self):
+ self.__write('AT+VIN_UV_W_LIM=?')
+ dat = self.__read()
+ res = scanf.scanf("%f", dat)
+ return res[0]
+
+ def __set_voltage_overlimit(self, over_limit):
+ if self.__isThere and self.__ser is not None:
+ self.__write('AT+VIN_OV_W_LIM={}'.format(over_limit))
+ dat = self.__read()
+ print(dat)
+
+ def __get_voltage_overlimit(self):
+ if self.__isThere and self.__ser is not None:
+ self.__write('AT+VIN_OV_W_LIM?')
+ dat = self.__read()
+ res = scanf.scanf("%f", dat)
+ return res[0]
+
+ def __set_current_overlimit(self, over_limit):
+ if self.__isThere and self.__ser is not None:
+ self.__write('AT+IOUT_W_LIM={}'.format(over_limit))
+ dat = self.__read()
+ print(dat)
+
+ def __get_current_overlimit(self):
+ if self.__isThere and self.__ser is not None:
+ self.__write('AT+IOUT_W_LIM?')
+ dat = self.__read()
+ dat = self.__read()
+ res = scanf.scanf("%f", dat)
+ return res[0]
+
+ def hello(self):
+ self.__isThere = False
+ if self.__ser is None:
+ return False
+ self.__write('at+ver?')
+ dat = self.__read()
+ res = scanf.scanf("ISEE Amper v%3c", dat)
+ if res is None:
+ return False
+ self.__isThere = True
+ self.__version = res[0]
+ return True
+
+ def getVersion(self):
+ return self.__version
+
+ def getVoltage(self):
+ if self.__isThere and self.__ser is not None:
+ self.__write('at+vin?')
+ dat = self.__read()
+ res = scanf.scanf("%f", dat)
+ self.__voltage = res[0]
+ # print(self.__voltage)
+ return self.__voltage
+ else:
+ return None
+
+ def getCurrent(self):
+ if self.__isThere and self.__ser is not None:
+ self.__write('at+in?')
+ dat = self.__read()
+ res = scanf.scanf("%f", dat)
+ self.__current = res[0]
+ # print(self.__current)
+ else:
+ return None
+ return self.__current
+
+ def getAlarm(self):
+ if self.__isThere and self.__ser is not None:
+ self.__write('at+st_mfr?')
+ dat = self.__read()
+ # print(dat)
+ self.__alarm_condition = (int(dat) & 0x0F)
+ # print(self.__alarm_condition)
+ return self.__alarm_condition
+
+ def clearAlarm(self):
+ if self.__isThere and self.__ser is not None:
+ self.__write('at+CLRFAULT')
+ dat = self.__read()
+ # print(dat)
+
diff --git a/test-cli/test/helpers/button_script.sh b/test-cli/test/helpers/button_script.sh
deleted file mode 100644
index 6908f22..0000000
--- a/test-cli/test/helpers/button_script.sh
+++ /dev/null
@@ -1,4 +0,0 @@
-#!/usr/bin/env bash
-i2cset -f -y 1 0x2d 0x40 0x31
-i2cset -f -y 1 0x2d 0x50 0xff
-i2cget -f -y 1 0x2d 0x50
diff --git a/test-cli/test/helpers/camara.py b/test-cli/test/helpers/camara.py
new file mode 100644
index 0000000..b23df74
--- /dev/null
+++ b/test-cli/test/helpers/camara.py
@@ -0,0 +1,116 @@
+import cv2
+import sh
+
+class Camara(object):
+ __device_name = None
+ __device = None
+ __setupScriptPath = ''
+ __w = 1280
+ __h = 720
+ __contrast = 0.0
+ __brightness = 0.0
+ __saturation = 55.0
+ __hue = 0.0
+ __exposure = 166
+
+ def __init__(self, setup_script_path, device="video0", width=1280, height=720):
+ self.__device_name = device
+ self.__w = width
+ self.__h = height
+ self.__setupScriptPath = setup_script_path
+
+ def Close(self):
+ if self.__device is not None:
+ del self.__device
+ self.__device = None
+
+ def Open(self):
+ self.Close()
+ self.__device = cv2.VideoCapture("/dev/{}".format(self.__device_name))
+ if self.__device.isOpened():
+ self.__configure()
+ return True
+ return False
+
+ def getSize(self):
+ return self.__w, self.__h
+
+ def setSize(self, w, h):
+ if self.__device is not None and self.__device.isOpened():
+ self.__w = self.__setCamVar(cv2.CAP_PROP_FRAME_WIDTH, w)
+ self.__h = self.__setCamVar(cv2.CAP_PROP_FRAME_HEIGHT, h)
+ else:
+ self.__w = w
+ self.__h = h
+
+ def setContrast(self, newVal):
+ if self.__device.isOpened():
+ self.__contrast = self.__setCamVar(cv2.CAP_PROP_CONTRAST, newVal)
+ else:
+ self.__contrast = newVal
+
+ def getContrast(self):
+ return self.__contrast
+
+ def setBrightness(self, newVal):
+ if self.__device.isOpened():
+ self.__brightness = self.__setCamVar(cv2.CAP_PROP_BRIGHTNESS, newVal)
+ else:
+ self.__brightness = newVal
+
+ def getBrightness(self):
+ return self.__brightness
+
+ def __configure(self):
+ self.__w = self.__setCamVar(cv2.CAP_PROP_FRAME_WIDTH, self.__w)
+ self.__h = self.__setCamVar(cv2.CAP_PROP_FRAME_HEIGHT, self.__h)
+ sh.bash(self.__setupScriptPath)
+
+ def __setCamVar(self, key, val):
+ valold = cv2.VideoCapture.get(self.__device, key)
+ if valold != val:
+ cv2.VideoCapture.set(self.__device, key, val)
+ t = cv2.VideoCapture.get(self.__device, key)
+ return t
+ return val
+
+ def __getCamVar(self, key):
+ return cv2.VideoCapture.get(self.__device, key);
+
+ def getFrameCount(self):
+ return cv2.VideoCapture.get(self.__device, cv2.CAP_PROP_BUFFERSIZE);
+
+ def getFrame(self):
+ if self.__device.isOpened():
+ retval, image = self.__device.read()
+ if retval:
+ return image
+ return None
+ else:
+ return None
+
+ def getImageSize(self, image):
+ if hasattr(image, 'shape'):
+ return image.shape[1], image.shape[0]
+ else:
+ return (0, 0, 0)
+
+ def showFrame(self, name, frame, w=False, maxTime=3000):
+ cv2.imshow(name, frame)
+ if w:
+ if maxTime == -1:
+ cv2.waitKey()
+ else:
+ cv2.waitKey(maxTime)
+
+ def saveFrame(self, fileName, Frame):
+ cv2.imwrite(fileName, Frame)
+
+ def readImage(self, filename):
+ return cv2.imread('{}'.format(filename), 0)
+
+ def destroyWindow(self, name):
+ cv2.destroyWindow(name)
+
+ def closeWindows(self):
+ cv2.destroyAllWindows() \ No newline at end of file
diff --git a/test-cli/test/helpers/changedir.py b/test-cli/test/helpers/changedir.py
new file mode 100644
index 0000000..fad9ade
--- /dev/null
+++ b/test-cli/test/helpers/changedir.py
@@ -0,0 +1,14 @@
+import os
+
+
+class changedir:
+ """Context manager for changing the current working directory"""
+ def __init__(self, newPath):
+ self.newPath = os.path.expanduser(newPath)
+
+ def __enter__(self):
+ self.savedPath = os.getcwd()
+ os.chdir(self.newPath)
+
+ def __exit__(self, etype, value, traceback):
+ os.chdir(self.savedPath) \ No newline at end of file
diff --git a/test-cli/test/helpers/cmdline.py b/test-cli/test/helpers/cmdline.py
new file mode 100644
index 0000000..db94a1c
--- /dev/null
+++ b/test-cli/test/helpers/cmdline.py
@@ -0,0 +1,28 @@
+
+class LinuxKernelCmd:
+
+ __kernel_vars = {}
+
+ def __init__(self):
+ self.parse_kernel_cmdline()
+
+ def parse_kernel_cmdline(self):
+ cmdline = open('/proc/cmdline', 'rb').read().decode()
+ cmd_array = cmdline.split()
+ i = 0
+ for f in cmdline.split():
+ pos = f.find('=')
+ if pos == -1:
+ cmd_array[i - 1] = cmd_array[i - 1] + " " + f
+ cmd_array.remove(cmd_array[i])
+ else:
+ i += 1
+ self.__kernel_vars = {}
+ for f in cmd_array:
+ dat = f.split('=')
+ self.__kernel_vars[dat[0]] = dat[1]
+
+ def getkvar(self, vName, vdefault):
+ if vName in self.__kernel_vars:
+ return self.__kernel_vars[vName]
+ return vdefault
diff --git a/test-cli/test/helpers/cv_display_test.py b/test-cli/test/helpers/cv_display_test.py
index b54a698..3a3004f 100644
--- a/test-cli/test/helpers/cv_display_test.py
+++ b/test-cli/test/helpers/cv_display_test.py
@@ -14,8 +14,7 @@ def pattern_detect(cam_device=0):
# RETURN 0 only if the test is ok
msg="0"
# Capture the corresponding camera device [0,1]
- #capture = cv2.VideoCapture(0)
- capture = cv2.VideoCapture("/dev/v4l/by-id/usb-Creative_Technology_Ltd._Live__Cam_Sync_HD_VF0770-video-index0")
+ capture = cv2.VideoCapture(0)
try:
_, image = capture.read()
except:
@@ -64,12 +63,10 @@ def pattern_detect(cam_device=0):
# After testing the gamma factor correction is computed with the green color
# gamma = gamma_factor / blue+red into green part
# gamma factor = 50-100
- #gamma = (100 / (avg_color_rawg[0] + avg_color_rawg[2] + 1))
- gamma=1
+ gamma = (100 / (avg_color_rawg[0] + avg_color_rawg[2] + 1))
# Adjust the image acording to this gamma value
adjusted = adjust_gamma(image, gamma=gamma)
-# adjusted=image
- cv2.imwrite( "/home/root/result_hdmi_img.jpg", adjusted);
+
# Calculate again the average color using the gamma adjusted image
# Crop the gamma adjusted image for wach color section
red_cal = adjusted[y1:y2, xr1:xr2]
@@ -109,19 +106,14 @@ def pattern_detect(cam_device=0):
mask_r = mask0 + mask1
# mask_r = cv2.inRange(hsv, lower_red, upper_red)
# Perform a morphological open to expand
-# kernel = np.ones((5, 5), np.uint8)
-# closing_r = cv2.morphologyEx(mask_r, cv2.MORPH_OPEN, kernel)
-# closing_b = cv2.morphologyEx(mask_b, cv2.MORPH_OPEN, kernel)
-# closing_g = cv2.morphologyEx(mask_g, cv2.MORPH_OPEN, kernel)
+ kernel = np.ones((15, 15), np.uint8)
+ closing_r = cv2.morphologyEx(mask_r, cv2.MORPH_OPEN, kernel)
+ closing_b = cv2.morphologyEx(mask_b, cv2.MORPH_OPEN, kernel)
+ closing_g = cv2.morphologyEx(mask_g, cv2.MORPH_OPEN, kernel)
# Count the number of pixels that are not of the corresponding color (black)
-# count_r = cv2.countNonZero(closing_r)
-# count_b = cv2.countNonZero(closing_b)
-# count_g = cv2.countNonZero(closing_g)
- #-----------
- count_r = cv2.countNonZero(mask_r)
- count_b = cv2.countNonZero(mask_b)
- count_g = cv2.countNonZero(mask_g)
- #-------
+ count_r = cv2.countNonZero(closing_r)
+ count_b = cv2.countNonZero(closing_b)
+ count_g = cv2.countNonZero(closing_g)
if (count_r < 5):
msg = "RED COUNT FAIL"
return msg
diff --git a/test-cli/test/helpers/detect.py b/test-cli/test/helpers/detect.py
new file mode 100644
index 0000000..193dabf
--- /dev/null
+++ b/test-cli/test/helpers/detect.py
@@ -0,0 +1,61 @@
+import cv2
+import numpy as np
+
+
+class Detect_Color(object):
+ oFrame = None
+ img_hsv = None
+ __red_lower1 = [0, 50, 20]
+ __red_upper1 = [5, 255, 255]
+ __red_lower2 = [175, 50, 20]
+ __red_upper2 = [180, 255, 255]
+ __blue_lower = [110, 50, 50]
+ __blue_upper = [130, 255, 255]
+ __green_lower = [36, 25, 25]
+ __green_upper = [86, 255, 255]
+
+ def __init__(self, frame):
+ self.oFrame = frame
+ self.__hist()
+
+ def __hist(self):
+ self.img_hsv = cv2.cvtColor(self.oFrame, cv2.COLOR_BGR2HSV)
+
+ def getRed(self):
+ return self.__detect_two_areas([0, 50, 20], [5, 255, 255], [175, 50, 20], [180, 255, 255])
+
+ def getBlue(self):
+ return self.__detect_one_area([110, 50, 50], [130, 255, 255])
+
+ def getGreen(self):
+ return self.__detect_one_area([36, 25, 25], [86, 255, 255])
+
+ def __detect_one_area(self, lower, upper):
+ a_lower = np.array(lower)
+ a_upper = np.array(upper)
+ c_mask = cv2.inRange(self.img_hsv, a_lower, a_upper)
+ croped = cv2.bitwise_and(self.oFrame, self.oFrame, mask=c_mask)
+ mean_v = cv2.mean(self.oFrame, mask=c_mask)
+ mean = {}
+ mean['R'] = mean_v[2]
+ mean['G'] = mean_v[1]
+ mean['B'] = mean_v[0]
+ count = cv2.countNonZero(c_mask)
+ return c_mask, croped, mean, count
+
+ def __detect_two_areas(self, lower1, upper1, lower2, upper2):
+ a1_lower = np.array(lower1)
+ a1_upper = np.array(upper1)
+ a2_lower = np.array(lower2)
+ a2_upper = np.array(upper2)
+ c_mask1 = cv2.inRange(self.img_hsv, a1_lower, a1_upper)
+ c_mask2 = cv2.inRange(self.img_hsv, a2_lower, a2_upper)
+ c_mask = cv2.bitwise_or(c_mask1, c_mask2)
+ croped = cv2.bitwise_and(self.oFrame, self.oFrame, mask=c_mask)
+ mean_v = cv2.mean(self.oFrame, mask=c_mask)
+ mean = {}
+ mean['R'] = mean_v[2]
+ mean['G'] = mean_v[1]
+ mean['B'] = mean_v[0]
+ count = cv2.countNonZero(c_mask)
+ return c_mask, croped, mean, count
diff --git a/test-cli/test/helpers/finisher.py b/test-cli/test/helpers/finisher.py
deleted file mode 100644
index 73142d9..0000000
--- a/test-cli/test/helpers/finisher.py
+++ /dev/null
@@ -1,151 +0,0 @@
-from test.helpers.syscmd import SysCommand
-from test.helpers.globalVariables import globalVar
-import binascii
-import uuid
-import subprocess
-from PIL import Image, ImageDraw, ImageFont
-import qrcode
-import PIL
-
-class Finisher(object):
- __muhb = None
- __final_to_burn_to_eeprom = None
- __qr_image = None
-
- def __init__(self, muhb = None):
- self.__muhb = muhb
- self.__final_to_burn_to_eeprom = bytearray()
- self.__qr_image = None
- pass
-
- def eeprom_burn(self):
- ## Create binary file
- str_cmd2 = "find /sys/ -iname 'eeprom'"
- eeprom_location = SysCommand("eeprom_location", str_cmd2)
- if eeprom_location.execute() == 0:
- raw_out = eeprom_location.getOutput()
- if raw_out == "":
- self.fail("Unable to get EEPROM location. IS EEPROM CONNECTED?")
- eeprom = raw_out.decode('ascii')
- eeprom = eeprom.strip('\n')
- ## push binary data to eeprom like if working with files
- file2 = open(eeprom, "w+b")
- file2.write(self.__final_to_burn_to_eeprom)
- else:
- self.fail("failed: could not complete find eeprom command")
-
- def generate_qr_stamp(self):
- # Generate QR to put in a stamp
- qr = qrcode.QRCode(
- version=1,
- error_correction=qrcode.constants.ERROR_CORRECT_L,
- box_size=10,
- border=4,
- )
- # Use board_uuid to generate the QR code
- qr.add_data(globalVar.g_uuid)
- qr.make(fit=True)
- # Save QR as image stamp.png
- img = qr.make_image()
- # Store QR as a class atrib
- self.__qr_image = img
- img.save('/home/root/stamp.png')
-
- def print_stamp(self):
- # Print stamp by sending a cmd to the printer
- str_cmd3 = "lpr -o scaling=4 stamp.png".format(self.__display)
- printstamp = SysCommand("printstamp", str_cmd3)
- if printstamp.execute() != 0:
- self.fail("failed: could not print stamp")
-
- def generate_screen(self):
- # Generate green image with board uuid and QR maybe pasted on top of green image
- # Define image size
- W = 1250
- H = 703
- # Create blank rectangle to write on
- image = Image.new('RGB', (W, H), (46, 204, 113, 0))
- draw = ImageDraw.Draw(image)
- message = "TEST OK\n\n" + "Board UUID: "+ globalVar.g_uuid
- font = ImageFont.truetype('/usr/share/fonts/truetype/dejavuu/DejaVuSans.ttf', 40)
- # Calculate the width and height of the text to be drawn, given font size
- w, h = draw.textsize(message, font=font)
- # Write the text to the image, where (x,y) is the top left corner of the text
- draw.text(((W-w)/2,(H-h)/2), message, align='center', font=font)
- #draw.image(self.__qr_image)
- image.save('/home/root/test/files/test_ok.png')
-
- def show_result_screen(self):
- # If test OK show test_OK.png located in /home/root, If test fail show test_fail.png, located in /home/root/test/files/test_KO.png
- if globalVar.fstatus:
- str_cmd4 = "fbi -T 1 --noverbose -d /dev/{} test/files/test_ok.png".format('fb0')
- else:
- str_cmd4 = "fbi -T 1 --noverbose -d /dev/{} test/files/test_ko.png".format('fb0')
-
- display_image = SysCommand("display_image", str_cmd4)
- #print(display_image.execute())
- if display_image.execute() != -1:
- self.fail("failed: could not display the image")
-
-
- def end_ok(self):
- # Burn retrieved igep eeprom struct
- new = self.__muhb[0][0]
-
- # Convert from string to hex
- hnew = new.encode()
- # Magic_id and default crc32 0x6d, 0x6a, 0x6d, 0xe4 with endianess changed so that u-boot loads it correctly
- # IF magic ever changes this magic_id should be changed. At the end always magic_id of test and u-boot should
- # be the same
- magic_id = bytes([0xe4, 0x6d, 0x6a, 0x6d])
- default_igep_crc32 = bytes([0x00, 0x00, 0x00, 0x00])
-
- # Create bytearray
- to_calculate = bytearray()
- # Build the final hex binary for crc32 operation
- to_calculate.extend(magic_id)
- to_calculate.extend(default_igep_crc32)
- to_calculate.extend(hnew)
-
- # Calculate crc32!
- new_crc32 = binascii.crc32(to_calculate)
- hnew_crc32 = new_crc32.to_bytes(4, byteorder="little")
-
- # Recreate final eeprom struct in bytearray
- self.__final_to_burn_to_eeprom = bytearray()
- self.__final_to_burn_to_eeprom.extend(magic_id)
- self.__final_to_burn_to_eeprom.extend(hnew_crc32)
- self.__final_to_burn_to_eeprom.extend(hnew)
- self.eeprom_burn()
-
- # Generate QR stamp
- self.generate_qr_stamp()
- # Send print stamp command
- #self.print_stamp()
- # Generate green image with board uuid and QR maybe pasted on top of green image
- self.generate_screen()
- # Show test_ok.png image in screen
- self.show_result_screen()
-
- def end_fail(self):
- # Burn igep eeprom bug struct
- hnew = self.__muhb.encode()
- default_fail = bytes([0xD0, 0xBA, 0xD0, 0xBA])
- self.__final_burn_to_eeprom = bytearray()
- self.__final_to_burn_to_eeprom.extend(default_fail)
- self.__final_to_burn_to_eeprom.extend(default_fail)
- self.__final_to_burn_to_eeprom.extend(hnew)
- self.__final_to_burn_to_eeprom.extend(default_fail)
- self.__final_to_burn_to_eeprom.extend(default_fail)
- self.__final_to_burn_to_eeprom.extend(default_fail)
- self.__final_to_burn_to_eeprom.extend(default_fail)
- self.__final_to_burn_to_eeprom.extend(default_fail)
- self.__final_to_burn_to_eeprom.extend(default_fail)
- self.__final_to_burn_to_eeprom.extend(default_fail)
- self.__final_to_burn_to_eeprom.extend(default_fail)
- self.__final_to_burn_to_eeprom.extend(default_fail)
- self.__final_to_burn_to_eeprom.extend(default_fail)
- self.__final_to_burn_to_eeprom.extend(default_fail)
- self.eeprom_burn()
- # Show test_ko.png image in screen
- self.show_result_screen() \ No newline at end of file
diff --git a/test-cli/test/helpers/get_dieid.py b/test-cli/test/helpers/get_dieid.py
deleted file mode 100644
index b20f143..0000000
--- a/test-cli/test/helpers/get_dieid.py
+++ /dev/null
@@ -1,43 +0,0 @@
-import mmap
-import os
-import struct
-MAP_MASK = mmap.PAGESIZE - 1
-WORD = 4
-def read(addr):
- """ Read from any location in memory
- Returns the readed value in hexadecimal format
- Keyword arguments:
- - addr: The memory address to be readed.
- """
- fd = os.open("/dev/mem", os.O_RDWR | os.O_SYNC)
- # Map one page
- mm = mmap.mmap(fd, mmap.PAGESIZE, mmap.MAP_SHARED, mmap.PROT_WRITE | mmap.PROT_READ, offset=addr & ~MAP_MASK)
- mm.seek(addr & MAP_MASK)
- retval = struct.unpack('I', mm.read(WORD))
- mm.close()
- os.close(fd)
- return "%08X" % retval[0]
-
-def getRegisters(model):
- if model.find("IGEP0046") == 0:
- registers = [0x021BC420, 0x021BC410]
- elif model.find("IGEP0000") == 0:
- registers = [0x021BC420, 0x021BC410]
- elif model.find("IGEP0034") == 0 or model.find("SOPA0000") == 0:
- registers = [0x44e10630, 0x44e10634, 0x44e10638, 0x44e1063C]
- elif model.find("OMAP3") == 0:
- registers = [0x4830A224, 0x4830A220, 0x4830A21C, 0x4830A218]
- elif model.find("OMAP5") == 0:
- registers = [0x4A002210, 0x4A00220C, 0x4A002208, 0x4A002200]
- return registers
-
-def genDieid(modelid):
- registers=getRegisters(modelid)
- id=""
- for i in range(len(registers)):
- id=id+(read(registers[i]))
- return id
-
-#if __name__ == "__main__":
- #registers = [0x021BC420, 0x021BC410]
- #print(main(registers))
diff --git a/test-cli/test/helpers/globalVariables.py b/test-cli/test/helpers/globalVariables.py
index c4d8358..baaae57 100644
--- a/test-cli/test/helpers/globalVariables.py
+++ b/test-cli/test/helpers/globalVariables.py
@@ -6,5 +6,4 @@ def globalVar():
g_mid = ""
outdata = "NONE"
station = ""
- fstatus = ""
- gdisplay = None \ No newline at end of file
+ taskid_ctl = ""
diff --git a/test-cli/test/helpers/gpio.py b/test-cli/test/helpers/gpio.py
new file mode 100644
index 0000000..f127f2b
--- /dev/null
+++ b/test-cli/test/helpers/gpio.py
@@ -0,0 +1,55 @@
+import os
+
+class gpio (object):
+ gpioNum = None
+
+ def __init__(self, gpioNum, dir, val):
+ self.gpioNum = gpioNum
+ self.__export(gpioNum)
+ self.__set_dir(gpioNum, dir)
+ if dir == 'out':
+ self.__set_value(gpioNum, val)
+
+ def set_val(self, value):
+ self.__set_value( self.gpioNum, value)
+
+
+ def __export(self, gpio_number):
+ if not os.path.isfile("/sys/class/gpio/gpio{}/value".format(gpio_number)):
+ try:
+ f = open("/sys/class/gpio/export", "w", newline="\n")
+ f.write(str(gpio_number))
+ f.close()
+ except IOError:
+ return False, '{}'.format(IOError.errno)
+ return True
+
+ def __unexport(self, gpio_number):
+ if os.path.isfile("/sys/class/gpio/gpio{}/value".format(gpio_number)):
+ try:
+ f = open("/sys/class/gpio/unexport", "w", newline="\n")
+ f.write(str(gpio_number))
+ f.close()
+ except IOError:
+ return False, '{}'.format(IOError.errno)
+ return True
+
+ def __set_dir(self, gpio_number, dir):
+ try:
+ f = open("/sys/class/gpio/gpio{}/direction".format(gpio_number), "r+", newline="\n")
+ val = f.readline()
+ if val != dir:
+ f.write(dir)
+ f.close()
+ except IOError:
+ return False, '{}'.format(IOError.errno)
+ return True
+
+ def __set_value(self, gpio_number, value):
+ try:
+ f = open("/sys/class/gpio/gpio{}/value".format(gpio_number), "w", newline="\n")
+ f.write(str(value))
+ f.close()
+ except IOError:
+ return False, '{}'.format(IOError.errno)
+ return True \ No newline at end of file
diff --git a/test-cli/test/helpers/int_registers.py b/test-cli/test/helpers/int_registers.py
new file mode 100644
index 0000000..133387c
--- /dev/null
+++ b/test-cli/test/helpers/int_registers.py
@@ -0,0 +1,68 @@
+import mmap
+import os
+import struct
+import sh
+
+MAP_MASK = mmap.PAGESIZE - 1
+WORD = 4
+
+
+def read(addr):
+ """ Read from any location in memory
+ Returns the readed value in hexadecimal format
+ Keyword arguments:
+ - addr: The memory address to be readed.
+ """
+ fd = os.open("/dev/mem", os.O_RDWR | os.O_SYNC)
+ # Map one page
+ mm = mmap.mmap(fd, mmap.PAGESIZE, mmap.MAP_SHARED, mmap.PROT_WRITE | mmap.PROT_READ, offset=addr & ~MAP_MASK)
+ mm.seek(addr & MAP_MASK)
+ retval = struct.unpack('I', mm.read(WORD))
+ mm.close()
+ os.close(fd)
+ return "%08X" % retval[0]
+
+def imx8m_readid():
+ f = open("/sys/bus/soc/devices/soc0/soc_uid", "r", newline="\n")
+ val = f.readline()
+ f.close()
+ return val.rstrip()
+
+
+def get_die_id(modelid):
+ dieid = ""
+
+ # get registers
+ if modelid.find("IGEP0046") == 0:
+ registers = [0x021BC420, 0x021BC410]
+ elif modelid.find("IGEP0034") == 0 or modelid.find("SOPA0000") == 0:
+ # registers: mac_id0_lo, mac_id0_hi, mac_id1_lo, mac_id1_hi
+ registers = [0x44e10630, 0x44e10634, 0x44e10638, 0x44e1063C]
+ elif modelid.find("OMAP3") == 0 or modelid.find("IGEP0020") == 0 or modelid.find("IGEP0030") == 0:
+ registers = [0x4830A224, 0x4830A220, 0x4830A21C, 0x4830A218]
+ elif modelid.find("OMAP5") == 0:
+ registers = [0x4A002210, 0x4A00220C, 0x4A002208, 0x4A002200]
+ elif modelid.find("IGEP0048") == 0:
+ return imx8m_readid()
+ else:
+ raise Exception('modelid not defined: {}, modelid')
+ for rg in registers:
+ dieid = dieid + read(rg)
+ return dieid
+
+
+def get_internal_mac(modelid):
+ mac = None
+
+ if modelid.find("IGEP0034") == 0 or modelid.find("SOPA0000") == 0:
+ # # registers: mac_id0_lo, mac_id0_hi
+ # registers = [0x44e10630, 0x44e10634]
+ # mac = ""
+ # for rg in registers:
+ # mac = mac + read(rg)
+ # #erase trailing zeros
+ # mac = mac[4::1]
+ # # To be finished...
+ mac = sh.cat("/sys/class/net/eth0/address").strip()
+
+ return mac
diff --git a/test-cli/test/helpers/iseelogger.py b/test-cli/test/helpers/iseelogger.py
new file mode 100644
index 0000000..4b1087c
--- /dev/null
+++ b/test-cli/test/helpers/iseelogger.py
@@ -0,0 +1,62 @@
+import logging
+import logging.handlers
+import datetime
+
+class ISEE_Logger(object):
+ __logger = None
+ __logHandler = None
+ __formater = None
+ __logHandlerConsole = None
+
+ def __init__(self, level=logging.INFO):
+ # Create syslog logger
+ self.__logger = logging.getLogger('ISEE_logger')
+ self.__logger.setLevel(level)
+ self.__logHandler = logging.handlers.SysLogHandler('/dev/log')
+ self.__logHandlerConsole = logging.StreamHandler()
+ self.__formater = logging.Formatter('Python: { "loggerName":"%(name)s", "timestamp":"%(asctime)s", "pathName":"%(pathname)s", "logRecordCreationTime":"%(created)f", "functionName":"%(funcName)s", "levelNo":"%(levelno)s", "lineNo":"%(lineno)d", "time":"%(msecs)d", "levelName":"%(levelname)s", "message":"%(message)s"}')
+ self.__logHandler.formatter = self.__formater
+ self.__logHandlerConsole.formatter = self.__formater
+ self.__logger.addHandler(self.__logHandler)
+ self.__logger.addHandler(self.__logHandlerConsole)
+
+ def setLogLevel(self, level):
+ if level.upper() == "DEBUG":
+ nlevel = logging.DEBUG
+ elif level.upper() == "INFO":
+ nlevel = logging.INFO
+ elif level.upper() == "ERROR":
+ nlevel = logging.ERROR
+ elif level.upper() == "WARNING":
+ nlevel = logging.WARNING
+ else:
+ nlevel = logging.DEBUG
+
+ self.__logger.setLevel(nlevel)
+
+ def getlogger(self):
+ return self.__logger
+
+class MeasureTime:
+ __difference = None
+ __first_time = None
+ __later_time = None
+
+ def __init__(self):
+ self.__first_time = datetime.datetime.now()
+
+ def start(self):
+ self.first_time = datetime.datetime.now()
+
+ def stop(self):
+ self.__later_time = datetime.datetime.now()
+ self.__difference = self.__later_time - self.__first_time
+ return self.__difference.total_seconds()
+
+ def getTime(self):
+ return self.__difference.total_seconds()
+
+
+global logObj
+logObj = ISEE_Logger(logging.INFO)
+
diff --git a/test-cli/test/helpers/iw.py b/test-cli/test/helpers/iw.py
new file mode 100644
index 0000000..8e29448
--- /dev/null
+++ b/test-cli/test/helpers/iw.py
@@ -0,0 +1,124 @@
+from sh import iw
+from scanf import scanf
+
+class iw_scan:
+ __interface = None
+ __raw_unparsed = []
+ __raw = None
+
+ def __init__(self, interface='wlan0'):
+ self.__interface = interface
+
+ def scan(self):
+ self.__raw_unparsed = []
+ self.__raw = iw('{}'.format(self.__interface), 'scan')
+ if self.__raw.exit_code == 0:
+ self.__raw_unparsed = self.__raw.stdout.decode("utf-8").split('\n')
+ return True
+ return False
+
+ def getRawData (self):
+ return self.__raw_unparsed
+
+ def getInterface(self):
+ return self.__interface
+
+ def getRawObject(self):
+ return self.__raw
+
+
+class iwScan:
+ __iw_scan = None
+ __station_list = {}
+
+ def __init__(self, interface='wlan0'):
+ self.__iw_scan = iw_scan(interface)
+
+ def scan(self):
+ if self.__iw_scan.scan():
+ self.__parse_scan(self.__iw_scan.getRawData())
+ return True
+ return False
+
+ def getLastError(self):
+ rObject = self.__iw_scan.getRawObject()
+ if rObject.exit_code != 0:
+ return rObject.stderr
+ return ''
+
+ def __Parse_BBS(self, line):
+ data = {}
+ res = scanf("BSS %s(on %s) -- %s", line)
+ if res == None:
+ res = scanf("BSS %s(on %s)", line)
+ data["MAC"] = res[0]
+ data["DEV"] = res[1]
+ if len(res) == 3:
+ data["ASSOCIATED"] = 'YES'
+ else:
+ data["ASSOCIATED"] = 'NO'
+ return data
+
+ def __Parse_T(self, id, OnNotFoundAttr ,line):
+ data = {}
+ res = scanf("{}: %s".format(id), line)
+ if res == None:
+ data[id.upper()] = OnNotFoundAttr
+ else:
+ data[id.upper()] = res[0]
+ return data
+
+ def __Parse_X(self, line):
+ data = {}
+ res = scanf("DS Parameter set: channel %s", line)
+ if res == None:
+ data['CHANNEL'] = '-1'
+ else:
+ data['CHANNEL'] = res[0]
+ return data
+
+ def __block_stationlist(self, rawdata):
+ list = {}
+ count = 0
+ for line in rawdata:
+ idx = line.find('BSS')
+ if idx != -1 and idx == 0:
+ if len(list) > 0:
+ count += 1
+ self.__station_list[count] = list
+ list = self.__Parse_BBS(line)
+ elif line.find('SSID:') != -1:
+ list = {**list, **self.__Parse_T('SSID', 'HIDDEN', line)}
+ elif line.find('freq:') != -1:
+ list = {**list, **self.__Parse_T('freq', '', line)}
+ elif line.find('signal:') != -1:
+ list = {**list, **self.__Parse_T('signal', '', line)}
+ elif line.find('DS Parameter set:') != -1:
+ list = {**list, **self.__Parse_X(line)}
+ if count > 0:
+ count += 1
+ self.__station_list[count] = list
+
+ def __parse_scan(self, rawData):
+ self.__station_list = {}
+ self.__block_stationlist(rawData)
+ #print('{}'.format(self.__station_list))
+ #for i in self.__station_list:
+ # print(self.__station_list[i])
+
+ def findConnected (self):
+ for i in self.__station_list:
+ st = self.__station_list[i]
+ if st.get('ASSOCIATED', 'NO') == 'YES':
+ return True, self.__station_list[i]
+ return False, None
+
+ def findBySSID(self, ssid):
+ for i in self.__station_list:
+ st = self.__station_list[i]
+ if st.get('SSID', '') == ssid:
+ return True, self.__station_list[i]
+ return False, None
+
+ def getStations(self):
+ return self.__station_list \ No newline at end of file
diff --git a/test-cli/test/helpers/md5.py b/test-cli/test/helpers/md5.py
new file mode 100644
index 0000000..9bc9e23
--- /dev/null
+++ b/test-cli/test/helpers/md5.py
@@ -0,0 +1,8 @@
+import hashlib
+
+def md5_file(fname):
+ hash_md5 = hashlib.md5()
+ with open(fname, "rb") as f:
+ for chunk in iter(lambda: f.read(4096), b""):
+ hash_md5.update(chunk)
+ return hash_md5.hexdigest() \ No newline at end of file
diff --git a/test-cli/test/helpers/plc.py b/test-cli/test/helpers/plc.py
new file mode 100644
index 0000000..069058d
--- /dev/null
+++ b/test-cli/test/helpers/plc.py
@@ -0,0 +1,82 @@
+import sh
+from sh import flashcp
+from sh import flash_eraseall
+from sh import ErrorReturnCode
+from sh import Command
+from test.helpers.gpio import gpio
+import time
+
+
+class dcpPLC(object):
+ __nReset = None
+ __Phy = None
+ __Plc = None
+ __mtd_device = None
+ __factool = None
+ __myConfigTool = None
+
+ def __init__(self, plcfactool, mtd_device):
+ # default save firmware
+ self.__nRest = gpio('75', 'out', '0')
+ self.__nRest = gpio('69', 'out', '0')
+ self.__nRest = gpio('78', 'out', '1')
+ self.__factool = plcfactool
+ self.__mtd_device = mtd_device
+ self.__myConfigTool = Command(self.__factool)
+
+ def setSaveFirmwareMode(self):
+ self.__nRest = gpio('75', 'out', '0')
+ self.__nRest = gpio('69', 'out', '0')
+ self.__nRest = gpio('78', 'out', '1')
+
+ def setBootMode(self):
+ self.__nRest = gpio('78', 'out', '0')
+ self.__nRest = gpio('75', 'out', '1')
+ self.__nRest = gpio('69', 'out', '1')
+
+ def setPLCReset(self):
+ self.__nRest = gpio('75', 'out', '0')
+ self.__nRest = gpio('69', 'out', '0')
+ time.sleep(1)
+ self.__nRest = gpio('69', 'out', '1')
+ self.__nRest = gpio('75', 'out', '1')
+
+ def SaveFirmware(self, firmare):
+ self.setSaveFirmwareMode()
+ try:
+ flash_eraseall(self.__mtd_device)
+ flashcp(firmare, self.__mtd_device)
+ except ErrorReturnCode as Error:
+ return False, "plc flash firmware failed {} ".format(Error.exit_code)
+ return True, ''
+
+ def set_plc(self, var, value, password):
+ try:
+ res = self.__myConfigTool("-o", "SET", "-p", "{}={}".format(var, value), "-w", "{}".format(password))
+ print(res)
+ except ErrorReturnCode as Error:
+ return False, "set var failed {} {}".format(var, Error.exit_code)
+ return True, ''
+
+ def set_plc2(self, var1, value1, var2, value2, password):
+ try:
+ res = self.__myConfigTool("-o", "SET", "-p", "{}={}".format(var1, value1), "-p",
+ "{}={}".format(var2, value2), "-w", "{}".format(password))
+ print(res)
+ except ErrorReturnCode as Error:
+ return False, "set var failed {}".format(Error.exit_code)
+ return True, ''
+
+ def get_plc(self, var, value, password):
+ try:
+ self.__myConfigTool("-o", "GET", "-p", "{}".format(var), '{}'.format(value), "-w", "{}".format(password))
+ except ErrorReturnCode as Error:
+ return False, "set var failed {} {}".format(var, Error.exit_code)
+ return True, ''
+
+ def discover(self):
+ try:
+ self.__myConfigTool("-o", "DISCOVER")
+ except ErrorReturnCode:
+ return False
+ return True
diff --git a/test-cli/test/helpers/psqldb.py b/test-cli/test/helpers/psqldb.py
index 26dd03d..0141414 100644
--- a/test-cli/test/helpers/psqldb.py
+++ b/test-cli/test/helpers/psqldb.py
@@ -1,34 +1,40 @@
import psycopg2
+import psycopg2.extras
+
class PgSQLConnection(object):
- """aaaaaaa"""
+ """Postgres Connection Object"""
__conection_object = None
__db_config = {'dbname': 'testsrv', 'host': '192.168.2.171',
- 'password': 'Idkfa2009', 'port': 5432, 'user': 'admin'}
+ 'password': 'Idkfa2009', 'port': 5432, 'user': 'admin'}
- def __init__ (self):
-# self.__conection_object = None
-# if connect_str is not None:
-# self.__db_config = connect_str
-# else:
- self.__db_config = {'dbname': 'testsrv', 'host': '192.168.2.171',
- 'password': 'Idkfa2009', 'port': 5432, 'user': 'admin'}
+ def __init__(self, connect_str=None):
+ self.__conection_object = None
+ if connect_str is not None:
+ self.__db_config = connect_str
+ else:
+ self.__db_config = {'dbname': 'testsrv', 'host': '192.168.2.171',
+ 'password': 'Idkfa2009', 'port': 5432, 'user': 'admin'}
- def db_connect (self, connect_str):
+ def db_connect(self, connect_str=None):
result = False
try:
if connect_str == None:
self.__conection_object = psycopg2.connect(**self.__db_config)
else:
- self.__db_config = connect_str;
+ self.__db_config = connect_str
self.__conection_object = psycopg2.connect(**self.__db_config)
+
self.__conection_object.autocommit = True
result = True
except Exception as error:
print(error)
return result
+ def getConfig(self):
+ return self.__db_config
+
def db_execute_query(self, query):
cur = self.__conection_object.cursor()
cur.execute(query)
@@ -36,20 +42,26 @@ class PgSQLConnection(object):
cur.close()
return data
+ def db_upload_large_file(self, filepath):
+ # a new connection must be created to use large objects
+ __conn = psycopg2.connect(**self.__db_config)
+ lobj = __conn.lobject(oid=0, mode="rw", new_oid=0, new_file=filepath)
+ fileoid = lobj.oid
+ lobj.close()
+ __conn.commit()
+ __conn.close()
+ return fileoid
+
def commit(self):
self.__conection_object.commit()
-
def db_close(self):
if self.__conection_object is not None:
self.__conection_object.close()
del self.__conection_object
self.__conection_object = None
-
def __del__(self):
if self.__conection_object is not None:
self.__conection_object.close()
- #print("disconnected")
-
-
+ # print("disconnected")
diff --git a/test-cli/test/helpers/qrreader.py b/test-cli/test/helpers/qrreader.py
new file mode 100644
index 0000000..51e5247
--- /dev/null
+++ b/test-cli/test/helpers/qrreader.py
@@ -0,0 +1,123 @@
+import evdev
+from evdev import InputDevice, categorize, ecodes
+import threading
+import time
+import selectors
+from selectors import DefaultSelector, EVENT_READ
+from test.helpers.iseelogger import logObj
+
+selector = selectors.DefaultSelector()
+
+qrdevice_list = [
+ "Honeywell Imaging & Mobility 1900",
+ "Manufacturer Barcode Reader",
+ "SM SM-2D PRODUCT HID KBW",
+ "Honeywell Imaging & Mobility 1400g"
+]
+
+qrkey_list = {'KEY_0': '0',
+ 'KEY_1': '1',
+ 'KEY_2': '2',
+ 'KEY_3': '3',
+ 'KEY_4': '4',
+ 'KEY_5': '5',
+ 'KEY_6': '6',
+ 'KEY_7': '7',
+ 'KEY_8': '8',
+ 'KEY_9': '9'
+ }
+
+
+class QRReader:
+ __qrstr = ""
+ __myReader = {}
+ __numdigits = 10
+ __dev = None
+
+ def __init__(self):
+ self.getQRlist()
+
+ def getQRlist(self):
+ devices = [evdev.InputDevice(path) for path in evdev.list_devices()]
+ logObj.getlogger().debug("{}".format(devices))
+ for device in devices:
+ logObj.getlogger().debug("{}".format(device.name))
+ if device.name in qrdevice_list:
+ self.__myReader['NAME'] = device.name
+ # print(self.__myReader['NAME'])
+ self.__myReader['PATH'] = device.path
+ # print(self.__myReader['PATH'])
+ self.__myReader['PHYS'] = device.phys
+ # print(self.__myReader['PHYS'])
+
+ def IsQR(self):
+ return 'NAME' in self.__myReader
+
+ def getQRNumber(self):
+ return self.__qrstr
+
+ def openQR(self):
+ if self.IsQR():
+ self.closeQR()
+ self.__dev = InputDevice(self.__myReader['PATH'])
+ return True
+ return False
+
+ def closeQR(self):
+ if self.__dev:
+ del self.__dev
+ self.__dev = None
+
+ def readQR(self):
+ """" Sync Read up to numdigits """
+ count = 0
+ self.__qrstr = ""
+ if self.__dev:
+ self.__dev.grab()
+ for event in self.__dev.read_loop():
+ if event.type == evdev.ecodes.EV_KEY:
+ c_event = categorize(event)
+ if c_event.keycode in qrkey_list:
+ if c_event.keystate == 0:
+ self.__qrstr += qrkey_list[c_event.keycode]
+ count += 1
+ if count == self.__numdigits:
+ break
+ self.__dev.ungrab()
+ return True
+ return False
+
+ def wait_event(self, timeout):
+ selector.register(self.__dev, selectors.EVENT_READ)
+ while True:
+ events = selector.select(timeout);
+ if not events:
+ return False
+ else:
+ return True
+
+ def readQRasync(self, timeout):
+ count = 0
+ r = False
+ self.__qrstr = ""
+ if self.__dev:
+ self.__dev.grab()
+ if self.wait_event(timeout):
+ for event in self.__dev.read_loop():
+ if event.type == evdev.ecodes.EV_KEY:
+ c_event = categorize(event)
+ if c_event.keycode in qrkey_list:
+ if c_event.keystate == 0:
+ self.__qrstr += qrkey_list[c_event.keycode]
+ count += 1
+ if count == self.__numdigits:
+ r = True
+ break
+ self.__dev.ungrab()
+ return r
+ return False
+
+# qr = QRReader()
+# if qr.openQR():
+# print(qr.readQRasync(2))
+# qr.closeQR()
diff --git a/test-cli/test/helpers/sdl.py b/test-cli/test/helpers/sdl.py
new file mode 100644
index 0000000..8131a53
--- /dev/null
+++ b/test-cli/test/helpers/sdl.py
@@ -0,0 +1,126 @@
+import sdl2.ext
+from sdl2 import *
+import os
+import time
+
+Colors = {
+ 'red': sdl2.ext.Color(255, 0, 0, 255),
+ 'green': sdl2.ext.Color(0, 255, 0, 255),
+ 'blue': sdl2.ext.Color(0, 0, 255, 255),
+ 'white': sdl2.ext.Color(255, 255, 255, 255),
+ 'black': sdl2.ext.Color(0, 0, 0, 255),
+}
+
+
+class SDL2(object):
+ __resources = None
+ __w = 1280
+ __h = 720
+ __winName = "noname"
+ __window = None
+ __factory = None
+ __renderer = None
+ __srender = None
+
+ def __init__(self, driver, display, w, h):
+ os.environ['SDL_VIDEODRIVER'] = "x11"
+ os.environ['DISPLAY'] = ":0"
+ # self.__resources = sdl2.ext.Resources(parent.getAppPath(), 'files')
+ sdl2.ext.init()
+ self.__createWindow()
+
+ def __createWindow(self):
+ self.__window = sdl2.ext.Window(title='{}'.format(self.__winName), size=(self.__w, self.__h), position=(0, 0),
+ flags=(
+ SDL_WINDOW_HIDDEN | SDL_WINDOW_FULLSCREEN | SDL_WINDOW_BORDERLESS | SDL_WINDOW_MAXIMIZED))
+ self.__renderer = sdl2.ext.Renderer(self.__window)
+ self.__factory = sdl2.ext.SpriteFactory(sdl2.ext.TEXTURE, renderer=self.__renderer)
+ self.__srender = self.__factory.create_sprite_render_system(self.__window)
+ self.__window.show()
+ SDL_ShowCursor(SDL_DISABLE)
+
+ # def createFont(self, fontName, fontSize, fontColor):
+ # return sdl2.ext.FontManager(font_path=self.__resources.get_path(fontName), size=fontSize, color=fontColor)
+
+ def WriteText(self, text, x, y, fontManager):
+ imText = self.__factory.from_text(text, fontmanager=fontManager)
+ self.__renderer.copy(imText, dstrect=(x, y, imText.size[0], imText.size[1]))
+
+ def show(self):
+ self.__window.show()
+
+ def hide(self):
+ self.__window.hide()
+
+ def setColor(self, red, green, blue, alpha):
+ self.__renderer.color = sdl2.ext.Color(red, green, blue, alpha)
+
+ def fillColor(self, red, green, blue, alpha):
+ self.setColor(red, green, blue, alpha)
+ self.__renderer.clear()
+
+ def fillbgColor(self, color, update=False):
+ if color in Colors:
+ self.__renderer.color = Colors[color]
+ self.__renderer.clear()
+ if update:
+ self.update()
+
+ def update(self):
+ self.__renderer.present()
+
+ # def showImage(self, filename):
+ # im = self.__factory.from_image(self.__resources.get_path(filename))
+ # self.__srender.render(im)
+
+
+class SDL2_Test(object):
+ __sdl2 = None
+
+ def __init__(self, driver, display, w, h):
+ self.__sdl2 = SDL2(driver, display, w, h)
+
+ def Clear(self):
+ self.__sdl2.fillbgColor('black', True)
+
+ def Paint(self, dcolor):
+ self.Clear()
+ self.__sdl2.fillbgColor(dcolor.lower(), True)
+
+
+class SDL2_Message(object):
+ __sdl2 = None
+ __fontManager_w = None
+ __fontManager_b = None
+
+ def __init__(self):
+ self.__sdl2 = SDL2()
+ self.__fontManager_w = self.__sdl2.createFont('OpenSans-Regular.ttf', 24, Colors['white'])
+ self.__fontManager_b = self.__sdl2.createFont('OpenSans-Regular.ttf', 24, Colors['black'])
+
+ def Clear(self):
+ self.__sdl2.fillbgColor('black', True)
+
+ def Paint(self, dcolor):
+ self.Clear()
+ self.__sdl2.fillbgColor(dcolor.lower(), True)
+
+ def __write_w(self, x, y, text):
+ self.__sdl2.WriteText(text, x, y, self.__fontManager_w)
+
+ def __write_b(self, x, y, text):
+ self.__sdl2.WriteText(text, x, y, self.__fontManager_b)
+
+ def setTestOK(self, board_uuid, qrdata):
+ self.Paint('GREEN')
+ self.__write_b(100, 50, 'TEST OK')
+ self.__write_b(100, 100, 'uuid={}'.format(board_uuid))
+ self.__write_b(100, 150, 'qr={}'.format(qrdata))
+ self.__sdl2.update()
+
+ def setTestERROR(self, error):
+ self.Paint('RED')
+ self.__write_w(100, 50, 'TEST FAILED')
+ self.__write_w(100, 100, '{}'.format(error))
+ self.__sdl2.update()
+
diff --git a/test-cli/test/helpers/setup_xml.py b/test-cli/test/helpers/setup_xml.py
index 3fd9fd5..d61d1fb 100644
--- a/test-cli/test/helpers/setup_xml.py
+++ b/test-cli/test/helpers/setup_xml.py
@@ -1,22 +1,22 @@
import xml.etree.ElementTree as XMLParser
class XMLSetup (object):
- """aaaaa"""
+ """XML Setup Parser"""
__tree = None # Parser
__dbType = None # database connection required: PgSQLConnection
__dbConnectionRaw = None # Connection string in raw
__dbConnectionStr = None # Connection string to use in sql object connection
def __init__(self, filename):
- """aaaaa"""
+ """Parse the file in the constructor"""
self.__tree = XMLParser.parse(filename)
def __del__(self):
- """aaaaa"""
+ """Destructor do nothing"""
pass
- def getdbConnectionStr (self):
- """aaaaa"""
+ def getdbConnectionStr(self):
+ """XML to database connection string"""
if self.__dbConnectionRaw is not None:
return self.__dbConnectionRaw
@@ -29,12 +29,44 @@ class XMLSetup (object):
return None
- def getPostgresConnectionStr (self):
- """aaaaa"""
+ def getPostgresConnectionStr(self):
+ """Get Connection string """
str = self.__dbConnectionRaw
del str['type']
return str
- def getMysqlConnectionStr (self):
+ def getBoard(self, key, default):
+ for element in self.__tree.iter('board'):
+ if key in element.attrib:
+ return element.attrib[key]
+ return default
+
+ def getTempPath(self, key, default):
+ for element in self.__tree.iter('tmpPath'):
+ if key in element.attrib:
+ return element.attrib[key]
+ return default
+
+ def getKeyVal(self, tag, key, default):
+ for element in self.__tree.iter(tag):
+ if key in element.attrib:
+ return element.attrib[key]
+ return default
+
+ def gettagKey (self, xmltag, xmlkey):
"""aaaaa"""
- pass \ No newline at end of file
+ for element in self.__tree.iter(xmltag):
+ return element.attrib[xmlkey]
+
+ return None
+
+ def getUSBlist(self):
+ list = {}
+ count = 0
+ for elements in self.__tree.iter("usbdev"):
+ sublist = {}
+ for key,val in elements.items():
+ sublist[key] = val
+ list[count] = sublist
+ count += 1
+ return list \ No newline at end of file
diff --git a/test-cli/test/helpers/syscmd.py b/test-cli/test/helpers/syscmd.py
index b579e39..7223774 100644
--- a/test-cli/test/helpers/syscmd.py
+++ b/test-cli/test/helpers/syscmd.py
@@ -1,6 +1,6 @@
import unittest
import subprocess
-from test.helpers.globalVariables import globalVar
+import threading
class TestSysCommand(unittest.TestCase):
@@ -10,7 +10,7 @@ class TestSysCommand(unittest.TestCase):
__outdata = None
__outtofile = False
- def __init__(self, testname, testfunc, str_cmd, outtofile = False):
+ def __init__(self, testname, testfunc, str_cmd, outtofile=False):
""" init """
super(TestSysCommand, self).__init__(testfunc)
self.__str_cmd = str_cmd
@@ -42,7 +42,7 @@ class TestSysCommand(unittest.TestCase):
else:
res = -3
outdata = completed.stdout
- self.longMessage=str(outdata).replace("'","")
+ self.longMessage = str(outdata).replace("'", "")
self.assertTrue(True)
except subprocess.CalledProcessError as err:
self.assertTrue(False)
@@ -54,11 +54,14 @@ class TestSysCommand(unittest.TestCase):
def remove_file(self):
pass
+
class SysCommand(object):
__str_cmd = None
__cmdname = None
__outdata = None
__errdata = None
+ __returnCode = None
+ __exception = None
def __init__(self, cmdname, str_cmd):
""" init """
@@ -66,31 +69,37 @@ class SysCommand(object):
self.__cmdname = cmdname
def getName(self):
- return self.__testname
+ return self.__cmdname
+
+ def setName(self, cmdName):
+ self.__cmdname = cmdName
+
+ def getParams(self):
+ return self.__str_cmd
+
+ def setParam(self, params):
+ self.__str_cmd = params
def execute(self):
- res = -1
+ # try:
+ self.__outdata = None
+ self.__errdata = None
+ self.__exception = None
try:
- self.__outdata = None
- self.__errdata = None
completed = subprocess.run(
self.__str_cmd,
- check=True,
+ check=False,
shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE
)
+ self.__returnCode = completed.returncode
self.__outdata = completed.stdout
- if completed.returncode is 0:
- res = 0
- if completed.stderr.decode('ascii') != "":
- res = -1
- self.__errdata = completed.stderr
+ self.__errdata = completed.stderr.decode('ascii')
except subprocess.CalledProcessError as err:
- res = -2
- except Exception as t:
- res = -3
- return res
+ self.__exception = err
+ self.__returnCode = -1
+ return self.__returnCode
def getOutput(self):
return self.__outdata
@@ -98,6 +107,12 @@ class SysCommand(object):
def getOutErr(self):
return self.__errdata
+ def getException(self):
+ return self.__exception
+
+ def IsException(self):
+ return self.__exception is not None
+
def getOutputlines(self):
return self.__outdata.splitlines()
@@ -106,3 +121,26 @@ class SysCommand(object):
f.write(self.__outdata)
f.close()
+
+class AsyncSys(threading.Thread):
+ sysObject = None
+ sysres = None
+
+ def __init__(self, threadID, name, counter):
+ threading.Thread.__init__(self)
+ self.threadID = threadID
+ self.name = name
+ self.counter = counter
+ self.sysObject = SysCommand(name, None)
+
+ def setParams(self, Params):
+ self.sysObject.setParam(Params)
+
+ def getRes(self):
+ return self.sysres
+
+ def getData(self):
+ return self.sysObject.getOutput()
+
+ def run(self):
+ self.sysres = self.sysObject.execute()
diff --git a/test-cli/test/helpers/testsrv_db.py b/test-cli/test/helpers/testsrv_db.py
index 94181f9..8e96e8c 100644
--- a/test-cli/test/helpers/testsrv_db.py
+++ b/test-cli/test/helpers/testsrv_db.py
@@ -1,10 +1,10 @@
from test.helpers.psqldb import PgSQLConnection
-from test.helpers.setup_xml import XMLSetup
-def find_between( s, first, last ):
+
+def find_between(s, first, last):
try:
- start = s.index( first ) + len( first )
- end = s.index( last, start )
+ start = s.index(first) + len(first)
+ end = s.index(last, start)
return s[start:end]
except ValueError:
return ""
@@ -19,158 +19,242 @@ class TestSrv_Database(object):
def __init__(self):
pass
- def open (self, filename):
- '''Open database connection'''
- self.__xml_setup = XMLSetup(filename)
+ def open(self, xmlObj):
+ self.__xml_setup = xmlObj
self.__sqlObject = PgSQLConnection()
return self.__sqlObject.db_connect(self.__xml_setup.getdbConnectionStr())
- def create_board(self, processor_id, model_id, variant, bmac = None):
+ def getConfig(self):
+ return self.__sqlObject.getConfig()
+
+ def create_board(self, processor_id, model_id, variant, station, bmac=None):
'''create a new board'''
if bmac is None:
- sql = "SELECT isee.create_board('{}', '{}', '{}', NULL);".format(processor_id, model_id, variant)
+ sql = "SELECT * FROM isee.f_create_board('{}', '{}', '{}', NULL, '{}');".format(processor_id, model_id,
+ variant,
+ station)
else:
- sql = "SELECT isee.create_board('{}', '{}', '{}', '{}');".format(processor_id, model_id, variant, bmac)
- #print('>>>' + sql)
+ sql = "SELECT * FROM isee.f_create_board('{}', '{}', '{}', '{}', '{}');".format(processor_id, model_id,
+ variant,
+ bmac, station)
+ # print('>>>' + sql)
try:
res = self.__sqlObject.db_execute_query(sql)
- #print(res)
- return res[0][0];
+ # print(res)
+ return res[0][0]
except Exception as err:
r = find_between(str(err), '#', '#')
- #print(r)
+ print(r)
+ print(str(err))
return None
- def create_model(self, modid, variant, descr, tgid):
- '''create new model'''
- sql = "SELECT isee.create_model('{}', '{}', '{}', '{}')".format(modid, variant, descr, tgid)
- #print('>>>' + sql)
+ def get_tests_list(self, board_uuid):
+ '''get the board test list'''
+ sql = "SELECT * FROM isee.f_get_tests_list('{}')".format(board_uuid)
+ # print('>>>' + sql)
try:
res = self.__sqlObject.db_execute_query(sql)
- #print(res)
- return res[0][0];
+ # print(res)
+ return res
except Exception as err:
r = find_between(str(err), '#', '#')
- #print(r)
+ # print(r)
return None
- def create_test_definition(self, testname, testdesc, testfunc):
- '''Create a new definition and return definition id on fail (testname already exist) return -1'''
- sql = "SELECT isee.define_test('{}', '{}', '{}')".format(testname, testdesc, testfunc)
- #print('>>>' + sql)
+ def get_test_params_list(self, testid):
+ sql = "SELECT * FROM isee.f_get_test_params_list({})".format(testid)
+ # print('>>>' + sql)
try:
res = self.__sqlObject.db_execute_query(sql)
- #print(res)
- return res[0][0];
+ # print(res)
+ return res
except Exception as err:
r = find_between(str(err), '#', '#')
- #print(r)
+ # print(r)
return None
- def add_testdef_to_group(self, testgroupid, testname, testparam):
- '''Assign definition to group test return true on success or false if it fails'''
- sql = "SELECT isee.add_test_to_group('{}', '{}', '{}')".format(testgroupid, testname, testparam)
- #print('>>>' + sql)
+ def open_test(self, board_uuid):
+ sql = "SELECT * FROM isee.f_open_test('{}')".format(board_uuid)
+ # print('>>>' + sql)
try:
res = self.__sqlObject.db_execute_query(sql)
- #print(res)
- return res[0][0];
+ # print(res)
+ return res[0][0]
except Exception as err:
r = find_between(str(err), '#', '#')
- #print(r)
+ # print(r)
return None
- def getboard_test_list(self, board_uuid):
- '''get the board test list'''
- sql = "SELECT isee.gettestlist('{}')".format(board_uuid)
- #print('>>>' + sql)
+ def run_test(self, testid_ctl, testid):
+ sql = "SELECT isee.f_run_test({},{})".format(testid_ctl, testid)
+ # print('>>>' + sql)
+ try:
+ self.__sqlObject.db_execute_query(sql)
+ except Exception as err:
+ r = find_between(str(err), '#', '#')
+ # print(r)
+ return None
+
+ def finish_test(self, testid_ctl, testid, newstatus, textresult):
+ sql = "SELECT isee.f_finish_test({},{},'{}','{}')".format(testid_ctl, testid, newstatus, textresult)
+ # print('>>>' + sql)
+ try:
+ print('finish_test => SQL {}'.format(sql))
+ self.__sqlObject.db_execute_query(sql)
+ return True
+ except Exception as err:
+ r = find_between(str(err), '#', '#')
+ print('finish_test => {}'.format(err))
+ return False
+
+ def upload_result_file(self, testid_ctl, testid, desc, filepath, mimetype):
+ try:
+ # generate a new oid
+ fileoid = self.__sqlObject.db_upload_large_file(filepath)
+ # insert into a table
+ sql = "SELECT isee.f_upload_result_file({},{},'{}','{}','{}')".format(testid_ctl, testid, desc, fileoid,
+ mimetype)
+ # print('>>>' + sql)
+ self.__sqlObject.db_execute_query(sql)
+ except Exception as err:
+ r = find_between(str(err), '#', '#')
+ # print(r)
+ return None
+
+ def get_task_variables_list(self, board_uuid):
+ sql = "SELECT * FROM isee.f_get_task_variables_list('{}')".format(board_uuid)
+ # print('>>>' + sql)
try:
res = self.__sqlObject.db_execute_query(sql)
- #print(res)
- return res;
+ # print(res)
+ return res
except Exception as err:
r = find_between(str(err), '#', '#')
- #print(r)
+ # print(r)
return None
- def getboard_comp_test_list(self, board_uuid):
- '''get the board test list'''
- sql = "SELECT isee.gettestcompletelist('{}')".format(board_uuid)
- #print('>>>' + sql)
+
+ def get_plc_macaddr(self, board_uuid):
+ sql = "SELECT * FROM isee.f_get_plcmac(\'{}\')".format(board_uuid)
+ # print('>>>' + sql)
+ try:
+ res = self.__sqlObject.db_execute_query(sql)
+ return res, ''
+ except Exception as err:
+ r = find_between(str(err), '#', '#')
+ print(r)
+ return None, r
+
+
+ def get_board_macaddr(self, board_uuid):
+ sql = "SELECT * FROM isee.f_get_board_macaddr('{}')".format(board_uuid)
+ # print('>>>' + sql)
try:
res = self.__sqlObject.db_execute_query(sql)
- #print(res)
- return res;
+ # print(res)
+ return res[0][0]
except Exception as err:
r = find_between(str(err), '#', '#')
- #print(r)
+ # print(r)
return None
- def open_testbatch(self, board_uuid):
- '''get the board test list'''
- sql = "SELECT isee.open_testbatch('{}')".format(board_uuid)
- #print('>>>' + sql)
+ def open_task(self, uuid):
+ sql = "SELECT * FROM isee.f_open_task('{}')".format(uuid)
+ # print('>>>' + sql)
try:
- res = str(self.__sqlObject.db_execute_query(sql)[0])
- res = res.replace('(', '')
- res = res.replace(')', '')
- res = res.replace(',', '')
- #print(res)
- return res;
+ res = self.__sqlObject.db_execute_query(sql)
+ # print(res)
+ return res[0][0]
except Exception as err:
r = find_between(str(err), '#', '#')
- #print(r)
+ # print(r)
return None
- def add_test_to_batch(self, board_uuid, testid, testid_ctl, result, groupid, data):
- '''get the board test list'''
- sql = "SELECT isee.add_test_to_batch_c('{}','{}','{}','{}','{}','{}')".format(board_uuid, testid, testid_ctl, result, groupid, data)
- #print('>>>' + sql)
+ def create_task_result(self, taskid_ctl, name, newstatus, newinfo=None):
+ sql = "SELECT isee.f_create_task_result({},'{}','{}','{}')".format(taskid_ctl, name, newstatus, newinfo)
+ # print('>>>' + sql)
+ try:
+ self.__sqlObject.db_execute_query(sql)
+ except Exception as err:
+ r = find_between(str(err), '#', '#')
+ # print(r)
+ return None
+
+ def update_taskctl_status(self, taskid_ctl, newstatus):
+ sql = "SELECT isee.f_update_taskctl_status({},'{}')".format(taskid_ctl, newstatus)
+ # print('>>>' + sql)
+ try:
+ self.__sqlObject.db_execute_query(sql)
+ except Exception as err:
+ r = find_between(str(err), '#', '#')
+ # print(r)
+ return None
+
+ def set_factorycode(self, uuid, factorycode):
+ sql = "SELECT isee.f_set_factorycode('{}','{}')".format(uuid, factorycode)
+ # print('>>>' + sql)
+ try:
+ self.__sqlObject.db_execute_query(sql)
+ except Exception as err:
+ r = find_between(str(err), '#', '#')
+ # print(r)
+ return None
+
+ def read_station_state(self, station):
+ sql = "SELECT * FROM station.getmystate('{}');".format(station)
+ # print('>>>' + sql)
try:
res = self.__sqlObject.db_execute_query(sql)
- #print(res)
- return res;
+ # print(res)
+ return res[0][0]
except Exception as err:
r = find_between(str(err), '#', '#')
- #print(r)
+ # print(r)
return None
- def close_testbatch(self, board_uuid, testid_ctl):
- '''get the board test list'''
- sql = "SELECT isee.close_testbatch('{}','{}')".format(board_uuid, testid_ctl)
- #print('>>>' + sql)
+ def change_station_state(self, station, newstate):
+ sql = "SELECT station.setmystate('{}', '{}', NULL)".format(newstate, station)
+ print('>>>' + sql)
try:
res = self.__sqlObject.db_execute_query(sql)
- #print(res)
- return res;
+ print(res)
+ return res[0][0]
except Exception as err:
r = find_between(str(err), '#', '#')
- #print(r)
+ print(r)
return None
- def update_set_test_row(self, nstation, testid_ctl, board_uuid, ctest, cstatus):
- '''get the board test list'''
- sql = "SELECT isee.update_set_test_row('{}','{}','{}','{}','{}')".format(nstation, testid_ctl ,board_uuid, ctest, cstatus)
- #print('>>>' + sql)
+ def get_setup_variable(self, skey):
+ sql = "SELECT * FROM admin.get_setupvar('{}')".format(skey)
+ # print('>>>' + sql)
try:
res = self.__sqlObject.db_execute_query(sql)
- #print(res)
- return res;
+ # print(res)
+ return res[0][0]
except Exception as err:
r = find_between(str(err), '#', '#')
- #print(r)
+ # print(r)
return None
- def getboard_eeprom(self, board_uuid):
- '''get the board eeprom struct '''
- sql = "SELECT isee.getboard_eeprom('{}')".format(board_uuid)
- #print('>>>' + sql)
+ def get_setup_variable(self, skey , default):
+ sql = "SELECT * FROM admin.get_setupvar('{}')".format(skey)
+ # print('>>>' + sql)
+ try:
+ res = self.__sqlObject.db_execute_query(sql)
+ # print(res)
+ return res[0][0]
+ except Exception as err:
+ r = find_between(str(err), '#', '#')
+ # print(r)
+ return default
+
+ def setDevelStationState(self, station, newState):
+ sql = "UPDATE station.station_state SET state = {} WHERE hostname={}".format(newState, station)
try:
res = self.__sqlObject.db_execute_query(sql)
- #print(res)
- return res;
+ # print(res)
+ return res[0][0]
except Exception as err:
r = find_between(str(err), '#', '#')
- #print(r)
+ # print(r)
return None
- \ No newline at end of file
diff --git a/test-cli/test/helpers/usb.py b/test-cli/test/helpers/usb.py
new file mode 100644
index 0000000..8f8848d
--- /dev/null
+++ b/test-cli/test/helpers/usb.py
@@ -0,0 +1,270 @@
+from test.helpers.syscmd import SysCommand
+from test.helpers.amper import Amper
+import scanf
+import os
+import json
+
+
+class USBDevices(object):
+ __dev_list = {}
+ __serial_converter = {}
+ __usb_disk = {}
+ __usb_amper = {}
+ __dev_list_filtered = {}
+ __usb_mstorage = {}
+ __xml_config = None
+
+ def __init__(self, xml):
+ self.__xml_config = xml
+ self.__getusblist()
+ self.__getSerialDevices()
+ self.__getAmper()
+ self.__getMassStorage()
+
+ def print_usb(self):
+ print("------ usb list -------")
+ print(json.dumps(self.__dev_list))
+ print("------ serial converter list -------")
+ print(self.__serial_converter)
+ print("------ usb disk list -------")
+ print(self.__usb_disk)
+ print("------ usb amper list -------")
+ print(self.__usb_amper)
+ print("------ usb mass storage list -------")
+ print(self.__usb_mstorage)
+ print("------ filtered list -------")
+ print(json.dumps(self.__dev_list_filtered))
+
+ def refresh(self):
+ self.__dev_list = {}
+ self.__serial_converter = {}
+ self.__usb_disk = {}
+ self.__usb_amper = {}
+ self.__usb_mstorage = {}
+ self.__dev_list_filtered = {}
+ self.__getusblist()
+ self.__getSerialDevices()
+ self.__getAmper()
+ self.__getMassStorage()
+
+ def getMassStorage(self):
+ return self.__usb_mstorage
+
+ def getAmper(self):
+ return self.__usb_amper
+
+ def __getAmper(self):
+ for c, s_items in self.__serial_converter.items():
+ for k,v in s_items.items():
+ if k.find("USB") != -1 and v == "Prolific_Technology_Inc._USB-Serial_Controller":
+ testAmper = Amper(port=k)
+ testAmper.open()
+ res = testAmper.hello()
+ if res:
+ self.__usb_amper = {'port': k, 'ver': testAmper.getVersion()}
+ testAmper.close()
+ if res:
+ return
+
+ def __getMassStorage(self):
+ for c, s_items in self.__usb_disk.items():
+ for k,v in s_items.items():
+ if k == "udev":
+ if v.get('ID_MODEL', None) == "File-Stor_Gadget" or v.get('ID_FS_LABEL', None) != "NODE_L031K6":
+ self.__usb_mstorage = {'disk': v.get('DEVNAME', ''), 'id': v.get('ID_FS_LABEL_ENC', '')}
+
+
+ def __check_ignore_usb_list(self, usb_list, dev_list):
+ count = 0
+ ignore = "0"
+ for _, l in usb_list.items():
+ for k, v in l.items():
+ if k == "ignore":
+ ignore = v
+ else:
+ value = dev_list.get(k, None)
+ if v == value:
+ count += 1
+ else:
+ count = 0
+ break
+ if count >= len(l) -1:
+ break
+
+ if count > 0:
+ if ignore == "1":
+ return True
+ return False
+
+
+ def __create_ignore_list(self, usb_list):
+ count = 1
+ for _, dev in self.__dev_list.items():
+ if not self.__check_ignore_usb_list(usb_list, dev):
+ self.__dev_list_filtered[count] = dev
+ count += 1
+
+ def __getusblist(self):
+ count = 0
+ res = open('/sys/kernel/debug/usb/devices', 'rb').read().decode()
+ list = {}
+ for i in res.split('\n'):
+ if i.find('T:') != -1:
+ if len(list) > 0:
+ count += 1
+ self.__dev_list[count] = list
+ list = self.__parse_T(i)
+ elif i.find('D:') != -1:
+ list = {**list, **self.__parse_D(i)}
+ elif i.find('P:') != -1:
+ list = {**list, **self.__parse_P(i)}
+ elif i.find('S:') != -1:
+ list = {**list, **self.__parse_S(i)}
+ else:
+ """ Ignore all rest"""
+ pass
+
+ if count > 0:
+ count += 1
+ self.__dev_list[count] = list
+
+ self.__create_ignore_list(self.__xml_config.getUSBlist())
+
+
+ def __parse_T(self, str):
+ data = {}
+ res = scanf.scanf("T: Bus=%2c Lev=%2c Prnt=%2c Port=%2c Cnt=%2c Dev#=%3c Spd=%s MxCh=%2c", str)
+ data["BUS"] = res[0]
+ data["LEV"] = res[1]
+ data["PRNT"] = res[2]
+ data["PORT"] = res[3]
+ data["CNT"] = res[4]
+ data["DEV"] = res[5].lstrip()
+ data["SPEED"] = res[6].rstrip()
+ data["MXCH"] = res[7].lstrip()
+ return data
+
+ def __parse_D(self, str):
+ data = {}
+ str += ' '
+ res = scanf.scanf("D: Ver=%5c Cls=%2c(%s ) %s ", str)
+ data["VER"] = res[0].lstrip()
+ data["CLASS"] = res[1]
+ return data
+
+ def __parse_P(self, str):
+ data = {}
+ str += ' '
+ res = scanf.scanf("P: Vendor=%4c ProdID=%4c %s", str)
+ data["VENDOR"] = res[0].lstrip()
+ data["PRODID"] = res[1]
+ return data
+
+ def __parse_S(self, str):
+ data = {}
+ if str.find('Manufacturer') != -1:
+ pos = str.find('Manufacturer=')
+ data["MANUFACTURER"] = str[pos + len('Manufacturer='):].rstrip()
+ elif str.find('Product') != -1:
+ pos = str.find('Product=')
+ data["PRODUCT"] = str[pos + len('Product='):].rstrip()
+ elif str.find('SerialNumber') != -1:
+ pos = str.find('SerialNumber=')
+ data["SERIAL"] = str[pos + len('SerialNumber='):].rstrip()
+ else:
+ pass
+ return data
+
+ def finddevicebytag(self, tag, str):
+ t = {}
+ count = 0
+ for i, k in self.__dev_list.items():
+ if tag in k:
+ if k[tag].find(str) != -1:
+ t[count] = k
+ count += 1
+ return t
+
+ def __getSerialDevices(self):
+ self.__serial_converter = {}
+ self.__usb_disk = {}
+ self.__usb_amper = {}
+ test_myPath = os.path.dirname(os.path.abspath(__file__))
+ test_myPath = test_myPath + "/../"
+ s = SysCommand('usb', '{}/scripts/usb.sh'.format(test_myPath))
+ s.execute()
+ data_list = s.getOutput().decode()
+ for i in data_list.split('\n'):
+ if len(i) > 0:
+ pos_div = i.find('-')
+ dev = i[0:pos_div - 1]
+ devName = i[pos_div + 2:]
+ # print("dev: {}".format(dev))
+ # print("devName: {}".format(devName))
+ res = scanf.scanf("/dev/sd%c", dev)
+ if res is not None:
+ dev_id = res[0][0]
+ # Now verify if is Disk or Partition
+ res = scanf.scanf("/dev/sd%c%c", dev)
+ if res is not None:
+ # this is a partition and we can ignore it
+ pass
+ else:
+ self.__usb_disk[dev_id] = {dev: devName, 'udev': self.__getudevquery(dev)}
+ else:
+ res = scanf.scanf("/dev/ttyUSB%c", dev)
+ if res is not None:
+ serial_id = res[0][0]
+ self.__serial_converter[serial_id] = {dev: devName, 'udev': self.__getudevquery(dev)}
+
+ def __getudevquery(self, device):
+ list = {}
+ s = SysCommand('query-udev', 'udevadm info --query=all -n {}'.format(device))
+ res = s.execute()
+ if res != 0:
+ return None
+ for i in s.getOutput().decode().split('\n'):
+ if i.find('P:') != -1:
+ list = self.__parse_udev_P(i)
+ elif i.find('S:') != -1:
+ list = {**list, **self.__parse_udev_S(i)}
+ elif i.find('N:') != -1:
+ list = {**list, **self.__parse_udev_N(i)}
+ elif i.find('S:') != -1:
+ list = {**list, **self.__parse_udev_S(i)}
+ elif i.find('E:') != -1:
+ list = {**list, **self.__parse_udev_E(i)}
+ return list
+
+ def __parse_udev_P(self, str):
+ pos = str.find('P: ')
+ str = str[len('P: ') + pos:]
+ return {'PATH': str}
+
+ def __parse_udev_S(self, str):
+ pos = str.find('P: ')
+ str = str[len('P: ') + pos:]
+ pos = str.find('disk/by-id/')
+ if pos != -1:
+ return {'DISK_BY_ID': str[len('disk/by-id/') + pos:]}
+ pos = str.find('disk/by-path/')
+ if pos != -1:
+ return {'DISK_BY_PATH': str[len('disk/by-path/') + pos:]}
+ pos = str.find('disk/by-uuid/')
+ if pos != -1:
+ return {'DISK_BY_UUID': str[len('disk/by-uuid/') + pos:]}
+ else:
+ return {}
+
+ def __parse_udev_N(self, str):
+ pos = str.find('N: ')
+ str = str[len('N: ') + pos:]
+ return {'DEV_NAME': str}
+
+ def __parse_udev_E(self, str):
+ pos = str.find('E: ')
+ str = str[len('E: ') + pos:]
+ pos = str.find('=')
+ str_key = str[:pos]
+ str_value = str[pos + 1:]
+ return {str_key: str_value}
diff --git a/test-cli/test/helpers/utils.py b/test-cli/test/helpers/utils.py
new file mode 100644
index 0000000..8d2c27b
--- /dev/null
+++ b/test-cli/test/helpers/utils.py
@@ -0,0 +1,68 @@
+import json
+import socket
+import os
+import scanf
+
+def save_json_to_disk(filePath, description, mime, json_data, result):
+ with open(filePath, 'w') as outfile:
+ json.dump(json_data, outfile, indent=4)
+ outfile.close()
+ result.append(
+ {
+ "description": description,
+ "filepath": filePath,
+ "mimetype": mime
+ }
+ )
+
+def save_file_to_disk(filePath, description, mime, data, result):
+ with open(filePath, 'w') as outfile:
+ outfile.write(data)
+ outfile.close()
+ result.append(
+ {
+ "description": description,
+ "filepath": filePath,
+ "mimetype": mime
+ }
+ )
+
+def save_result (filePath, description, mime, result):
+ result.append(
+ {
+ "description": description,
+ "filepath": filePath,
+ "mimetype": mime
+ }
+ )
+
+
+def str2bool(v):
+ return v.lower() in ("yes", "true", "t", "1")
+
+def station2Port(base):
+ Name = socket.gethostname()
+ res = scanf.scanf("Station%d", Name)
+ if res[0]:
+ NmBase = int(base)
+ res = int(res[0])
+ return str(NmBase + res)
+ return base
+
+
+def sys_read(sysnode):
+ try:
+ f = open(sysnode, "r")
+ data = f.readline()
+ f.close()
+ except IOError as Error:
+ return False, '{}'.format(Error.errno)
+ return True, data
+
+def removefileIfExist(fname):
+ if os.path.exists(fname):
+ try:
+ os.remove(fname)
+ except OSError as error:
+ return False, str(error)
+ return True, ''
diff --git a/test-cli/test/runners/simple.py b/test-cli/test/runners/simple.py
index 5b2da5a..c110b1c 100644
--- a/test-cli/test/runners/simple.py
+++ b/test-cli/test/runners/simple.py
@@ -1,5 +1,3 @@
-#!/usr/bin/env python
-
"""
Simple Test Runner for unittest module
@@ -7,10 +5,7 @@ Simple Test Runner for unittest module
import sys
import unittest
-import os
-from test.helpers.globalVariables import globalVar
-from test.helpers.testsrv_db import TestSrv_Database
-
+from test.helpers.iseelogger import logObj
class SimpleTestRunner:
@@ -22,77 +17,78 @@ class SimpleTestRunner:
---------------------------------------------
"""
- def __init__(self, stream=sys.stderr, verbosity=0):
+ __pgObj = None
+
+ def __init__(self, pgObj, stream=sys.stderr, verbosity=0):
self.stream = stream
self.verbosity = verbosity
+ self.__pgObj = pgObj
def writeUpdate(self, message):
self.stream.write(message)
def run(self, test):
- """ Run the given test case or Test Suite.
-
- """
- result = TextTestResult(self)
+ """ Run the given test case or Test Suite. """
+ result = TextTestResult(self, self.__pgObj)
test(result)
- result.testsRun
- # self.writeUpdate("---------------------------------------------\n")
return result
+
class TextTestResult(unittest.TestResult):
# Print in terminal with colors
PASS = '\033[32mPASS\033[0m\n'
FAIL = '\033[31mFAIL\033[0m\n'
ERROR = '\033[31mERROR\033[0m\n'
- def __init__(self, runner):
+ __pgObj = None
+
+ def __init__(self, runner, pgObj):
unittest.TestResult.__init__(self)
self.runner = runner
self.result = self.ERROR
+ self.__pgObj = pgObj
def startTest(self, test):
unittest.TestResult.startTest(self, test)
self.runner.writeUpdate("%s : " % test.shortDescription())
# SEND TO DB THE UPDATE THAT WE RUN EACH TEST
- psdb = TestSrv_Database()
- psdb.open("setup.xml")
- psdb.update_set_test_row(globalVar.station, globalVar.testid_ctl, globalVar.g_uuid, test.shortDescription(), "RUNNING")
+ self.__pgObj.run_test(test.params["testidctl"], test.params["testid"])
def addSuccess(self, test):
unittest.TestResult.addSuccess(self, test)
- self.result=self.PASS
+ self.result = self.PASS
def addError(self, test, err):
unittest.TestResult.addError(self, test, err)
test.longMessage = err[1]
self.result = self.ERROR
+ print(err)
def addFailure(self, test, err):
unittest.TestResult.addFailure(self, test, err)
- test.longMessage=err[1]
+ test.longMessage = err[1]
self.result = self.FAIL
+ logObj.getlogger().info('{}:{}'.format(test, err))
def stopTest(self, test):
- unittest.TestResult.stopTest(self, test)
- # display: print test result
- self.runner.writeUpdate(self.result)
- # DB: PREPARE THE DATA TO BE INSERTED
- dbdata = {}
- dbdata['uuid'] = globalVar.g_uuid
- dbdata['name'] = test.shortDescription()
- dbdata['result'] = self.result
- dbdata['msg'] = str(test.longMessage)
- #DB: INSERT IN THE DATABASE
- filename='test_results.dat'
- testResult = open(filename, 'a')
- testResult.write(str(dbdata))
- testResult.write("\n")
- testResult.close()
- #CONVERT FANCY FAIL AND PASS
- if self.result==self.PASS: simple_result="TRUE"
- if self.result == self.FAIL: simple_result = "FALSE"
- elif self.result == self.ERROR: simple_result = "FALSE"
- #SEND TO DB THE RESULT OF THE TEST
- psdb = TestSrv_Database()
- psdb.open("setup.xml")
- psdb.add_test_to_batch(globalVar.g_uuid, test.shortDescription(), globalVar.testid_ctl, simple_result, globalVar.g_mid, test.longMessage)
+ try:
+ unittest.TestResult.stopTest(self, test)
+ # display: print test result
+ self.runner.writeUpdate(self.result)
+ # save result data
+ for result in test.getresults():
+ self.__pgObj.upload_result_file(test.params["testidctl"], test.params["testid"],
+ result["description"], result["filepath"], result["mimetype"])
+ # SEND TO DB THE RESULT OF THE TEST
+ if self.result == self.PASS:
+ status = "TEST_COMPLETE"
+ resulttext = test.gettextresult()
+ logObj.getlogger().info('{}:{}:{}:{}'.format(test.params["testidctl"], test.params["testid"], status, resulttext))
+ else:
+ status = "TEST_FAILED"
+ resulttext = test.longMessage
+ logObj.getlogger().info('{}:{}:{}:{}'.format(test.params["testidctl"], test.params["testid"], status, resulttext))
+ if not self.__pgObj.finish_test(test.params["testidctl"], test.params["testid"], status, resulttext):
+ self.__pgObj.finish_test(test.params["testidctl"], test.params["testid"], status, "")
+ except Exception as E:
+ logObj.getlogger().error('Exception: [{}]{}:{}:{}:{}'.format(E, test.params["testidctl"], test.params["testid"], status, resulttext)) \ No newline at end of file
diff --git a/test-cli/test/scripts/__init__.py b/test-cli/test/scripts/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/test-cli/test/scripts/__init__.py
diff --git a/test-cli/test/scripts/usb.sh b/test-cli/test/scripts/usb.sh
new file mode 100755
index 0000000..52ea4b2
--- /dev/null
+++ b/test-cli/test/scripts/usb.sh
@@ -0,0 +1,14 @@
+#!/bin/bash
+
+for sysdevpath in $(find /sys/bus/usb/devices/usb*/ -name dev); do
+ (
+ syspath="${sysdevpath%/dev}"
+ devname="$(udevadm info -q name -p $syspath)"
+ [[ "$devname" == "bus/"* ]] && continue
+ eval "$(udevadm info -q property --export -p $syspath)"
+ [[ -z "$ID_SERIAL" ]] && continue
+ echo "/dev/$devname - $ID_SERIAL"
+ )
+done
+
+exit 0
diff --git a/test-cli/test/tasks/__init__.py b/test-cli/test/tasks/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/test-cli/test/tasks/__init__.py
diff --git a/test-cli/test/tasks/flasheeprom.py b/test-cli/test/tasks/flasheeprom.py
new file mode 100644
index 0000000..775c556
--- /dev/null
+++ b/test-cli/test/tasks/flasheeprom.py
@@ -0,0 +1,77 @@
+import os
+import binascii
+from test.helpers.globalVariables import globalVar
+
+class EEprom_Flasher:
+ __Name = None
+ __varlist = None
+ __xmlObj = None
+ __lastError = ''
+
+ def __init__(self, varlist):
+ self.__varlist = varlist
+ self.__xmlObj = varlist['xml']
+ self.__Name = varlist.get('name_eeprom', self.__xmlObj.getKeyVal('EEProm_Flasher', 'name', 'EEProm_Flasher'))
+ # self.__igepflashPath = self.__xmlObj.getKeyVal('NAND_Flasher', 'igep_flash', '/usr/bin/igep-flash')
+ # self.__ImagePath = varlist.get('flashimagepath', '')
+ # self.__SkipNandtest = varlist.get('skipnandtest', self.__xmlObj.getKeyVal('NAND_Flasher', 'skipnandtest', '1'))
+
+ def getTaskName(self):
+ return self.__Name
+
+ def getError(self):
+ return self.__lastError
+
+ def Execute(self):
+ return True, None
+
+ def __generate_data_bytes(self, boarduuid, mac0, mac1):
+ data = bytearray()
+ data += (2029785358).to_bytes(4, 'big') # magicid --> 0x78FC110E
+ data += bytearray([0, 0, 0, 0]) # crc32 = 0
+ data += bytearray(boarduuid + "\0",
+ 'ascii') # uuid --> 'c0846c8a-5fa5-11ea-8576-f8b156ac62d7' and \0 at the end
+ data += binascii.unhexlify(mac0.replace(':', '')) # mac0 --> 'f8:b1:56:ac:62:d7'
+ if mac1 is not None:
+ data += binascii.unhexlify(mac1.replace(':', '')) # mac1 --> 'f8:b1:56:ac:62:d7'
+ else:
+ data += bytearray([0, 0, 0, 0, 0, 0]) # mac1 --> 0:0:0:0:0:0
+ # calculate crc
+ crc = (binascii.crc32(data, 0)).to_bytes(4, 'big')
+ data[4:8] = crc
+ return data
+
+
+ def __generate_output_data(self, data_rx):
+ boarduuid = data_rx[8:44].decode('ascii') # no 8:45 porque el \0 final hace que no funcione postgresql
+ mac0 = binascii.hexlify(data_rx[45:51]).decode('ascii')
+ mac1 = binascii.hexlify(data_rx[51:57]).decode('ascii')
+ smac0 = ':'.join(mac0[i:i + 2] for i in range(0, len(mac0), 2))
+ smac1 = ':'.join(mac1[i:i + 2] for i in range(0, len(mac1), 2))
+ eepromdata = "UUID " + boarduuid + " , MAC0 " + smac0 + " , MAC1 " + smac1
+ return eepromdata
+
+
+ # returns exitcode and data saved into eeprom
+ def flash_eeprom(self, eeprompath, boarduuid, mac0, mac1=None):
+ print("Start programming Eeprom...")
+ # check if eeprompath is correct
+ if os.path.isfile(eeprompath):
+ # create u-boot data struct
+ data = self.__generate_data_bytes(boarduuid, mac0, mac1)
+ # write into eeprom and read back
+ f = open(eeprompath, "r+b")
+ f.write(data)
+ f.seek(0)
+ data_rx = f.read(57)
+ for i in range(57):
+ if data_rx[i] != data[i]:
+ print("Error while programming eeprom memory.")
+ return 1, None
+ print("Eeprom programmed succesfully.")
+ # generated eeprom read data
+ eepromdata = self.__generate_output_data(data_rx)
+ return 0, eepromdata
+ else:
+ print("Eeprom memory not found.")
+ return 1, None
diff --git a/test-cli/test/tasks/flashmemory.py b/test-cli/test/tasks/flashmemory.py
new file mode 100644
index 0000000..ad9d92a
--- /dev/null
+++ b/test-cli/test/tasks/flashmemory.py
@@ -0,0 +1,44 @@
+import sh
+import os
+
+class NAND_Flasher:
+ __Name = None
+ __varlist = None
+ __xmlObj = None
+ __igepflashPath = None
+ __lastError = ''
+
+ def __init__(self, varlist):
+ self.__varlist = varlist
+ self.__xmlObj = varlist['xml']
+ self.__Name = varlist.get('name_flashnand', self.__xmlObj.getKeyVal('NAND_Flasher', 'name', 'NAND_Flasher'))
+ self.__igepflashPath = self.__xmlObj.getKeyVal('NAND_Flasher', 'igep_flash', '/usr/bin/igep-flash')
+ self.__ImagePath = varlist.get('flashimagepath', '')
+ self.__SkipNandtest = varlist.get('skipnandtest', self.__xmlObj.getKeyVal('NAND_Flasher', 'skipnandtest', '1'))
+
+ def getTaskName(self):
+ return self.__Name
+
+ def getError(self):
+ return self.__lastError
+
+ def Execute(self):
+ try:
+ self.__lastError = ''
+ if not os.path.isfile(self.__ImagePath):
+ self.__lastError('Not Image Found: {}'.format(self.__ImagePath))
+ return False, None
+ if self.__SkipNandtest == '1':
+ p = sh.bash('{}'.format(self.__igepflashPath), "--skip-nandtest", "--image", self.__ImagePath)
+ else:
+ p = sh.bash('{}'.format(self.__igepflashPath), "--image", self.__ImagePath)
+ if p.exit_code != 0:
+ return False, None
+ except OSError as e:
+ self.__lastError('Exception: {}'.format(e.strerror))
+ return False, None
+ except Exception as Error:
+ self.__lastError('Exception: {}'.format(Error))
+ return False, None
+ return True, None
+
diff --git a/test-cli/test/tests/qamp.py b/test-cli/test/tests/qamp.py
deleted file mode 100644
index 9fcd6d2..0000000
--- a/test-cli/test/tests/qamp.py
+++ /dev/null
@@ -1,87 +0,0 @@
-from test.helpers.syscmd import SysCommand
-import unittest
-import serial
-import time
-
-class Qamp(unittest.TestCase):
-
- def __init__(self, testname, testfunc, undercurrent=0.1, overcurrent=2):
- self._current = 0.0
- self._undercurrent=undercurrent
- self._overcurrent = overcurrent
- self._vshuntfactor=16384.0
- self._ser = serial.Serial()
- self._ser.port = '/dev/ttyUSB0'
- self._ser.baudrate = 115200
- self._ser.parity = serial.PARITY_NONE
- self._ser.timeout = 0
- self._ser.writeTimout = 0
- self._ser.xonxoff = False
- self._ser.rtscts = False
- self._ser.dsrdtr = False
- super(Qamp, self).__init__(testfunc)
- self._testMethodDoc = testname
-
- def __del__(self):
- self._ser.close()
-
- def execute(self):
- # Open Serial port ttyUSB0
- error=0
- try:
- self._ser.open()
- except:
- self.fail("failed: IMPOSSIBLE OPEN USB-SERIAL PORT ( {} )".format(self._ser.port))
- error=1
- return -1
- if error==0:
- # Clean input and output buffer of serial port
- self._ser.flushInput()
- self._ser.flushOutput()
- # Send command to read Voltage at Shunt resistor
- # Prepare cmd
- cmd = ('at\n\r')
- i=0
- while (i < len(cmd)):
- i = i + self._ser.write(cmd[i].encode('ascii'))
- time.sleep(0.05)
- self._ser.read(1)
- res0 = []
- while (self._ser.inWaiting() > 0): # if incoming bytes are waiting to be read from the serial input buffer
- res0.append(self._ser.read(1).decode('ascii')) # read the bytes and convert from binary array to ASCII
- print(res0)
- #CHECK COM FIRST
- cmd = ('at+in?\n\r')
- i = 0
-
- # Write, 1 by 1 byte at a time to avoid hanging of serial receiver code of listener, emphasis being made at sleep of 50 ms.
- while (i < len(cmd)):
- i = i + self._ser.write(cmd[i].encode('ascii'))
- time.sleep(0.05)
- self._ser.read(1)
-
- # Read, 1 by 1 byte
- res = []
- while (self._ser.inWaiting() > 0): # if incoming bytes are waiting to be read from the serial input buffer
- res.append(self._ser.read(1).decode('ascii')) # read the bytes and convert from binary array to ASCII
-
- string = ''.join(res)
- string = string.replace('\n', '')
- try:
- self._current = float(int(string, 0)) / self._vshuntfactor
- except:
- self.fail("failed: CAN'T READ CONSUMPTION (CURRENT=0?)")
- error=1
- return -1
- if error==0:
- print(self._current)
- # In order to give a valid result it is importarnt to define an under current value
- if (self._current > float(self._overcurrent)):
- self.fail("failed: OVERCURRENT DETECTED ( {} )".format(self._current))
- return -1
-
- if (self._current < float(self._undercurrent)):
- self.fail("failed: UNDERCURRENT DETECTED ( {} )".format(self._current))
- return -1
-
-
diff --git a/test-cli/test/tests/qamper.py b/test-cli/test/tests/qamper.py
new file mode 100644
index 0000000..96b673c
--- /dev/null
+++ b/test-cli/test/tests/qamper.py
@@ -0,0 +1,74 @@
+import unittest
+from test.helpers.amper import Amper
+
+
+class Qamper(unittest.TestCase):
+ params = None
+ __resultlist = None # resultlist is a python list of python dictionaries
+ dummytest = False
+
+ # varlist: undercurrent, overcurrent
+ def __init__(self, testname, testfunc, varlist):
+ self.params = varlist
+ super(Qamper, self).__init__(testfunc)
+
+ if "undercurrent" in varlist:
+ self._undercurrent = varlist["undercurrent"]
+ else:
+ raise Exception('undercurrent param inside Qamp must be defined')
+ if "overcurrent" in varlist:
+ self._overcurrent = varlist["overcurrent"]
+ else:
+ raise Exception('overcurrent param inside Qamp must be defined')
+ self._testMethodDoc = testname
+ self.__resultlist = []
+ if "dummytest" in varlist:
+ self.dummytest = varlist["dummytest"]
+
+ def saveresultinfile(self, currentvalue):
+ # save result in a file
+ with open('/mnt/station_ramdisk/amper.txt', 'w') as outfile:
+ n = outfile.write("Current: {} A".format(currentvalue))
+ outfile.close()
+ self.__resultlist.append(
+ {
+ "description": "Amperimeter values",
+ "filepath": "/mnt/station_ramdisk/amper.txt",
+ "mimetype": "text/plain"
+ }
+ )
+
+ def execute(self):
+ if self.dummytest == '1':
+ self.current = "0.0"
+ self.saveresultinfile(self.current)
+ return
+
+ amp = Amper()
+ # open serial connection
+ if not amp.open():
+ self.fail("Error: can not open a serial port")
+ # check if the amperimeter is connected and working
+ # 2 ATTEMTS
+ if not amp.hello():
+ if not amp.hello():
+ self.fail("Error: can not communicate")
+ # get current value (in Amperes)
+ self.current = amp.getCurrent()
+ # save result in a file
+ self.saveresultinfile(self.current)
+ # close serial connection
+ amp.close()
+ # Check current range
+ if float(self.current) > float(self._overcurrent):
+ # Overcurrent detected
+ self.fail("failed: Overcurrent detected ( {} )".format(self.current))
+ elif float(self.current) < float(self._undercurrent):
+ # Undercurrent detected
+ self.fail("failed: Undercurrent detected ( {} )".format(self.current))
+
+ def getresults(self):
+ return self.__resultlist
+
+ def gettextresult(self):
+ return "{} A".format(self.current)
diff --git a/test-cli/test/tests/qaudio.py b/test-cli/test/tests/qaudio.py
index a1572ca..a219a1a 100644
--- a/test-cli/test/tests/qaudio.py
+++ b/test-cli/test/tests/qaudio.py
@@ -1,37 +1,100 @@
-from test.helpers.syscmd import SysCommand
import unittest
-#class name
+import sh
+import wave
+import contextlib
+import uuid
+import time
+from test.helpers.utils import save_result
+from test.helpers.iseelogger import logObj
+
class Qaudio(unittest.TestCase):
- # Initialize the variables
+ params = None
+ __resultlist = None # resultlist is a python list of python dictionaries
+ __dtmf_secuence = ["1", "3", "5", "7", "9"]
+ __dtmf_file = None
+ __file_uuid = None
+ __QaudioName = None
- def __init__(self, testname, testfunc, dtmfFile):
- # Doing this we will initialize the class and later on perform a particular method inside this class
+ def __init__(self, testname, testfunc, varlist):
+ self.params = varlist
super(Qaudio, self).__init__(testfunc)
self._testMethodDoc = testname
- self._dtmfFile=dtmfFile
- self.__sum=0
- self.__refSum = 25 # 1+3+5+7+9
+ self.__resultlist = []
+ self.__xmlObj = varlist["xml"]
+ self.__QaudioName = varlist.get('name', 'qaudio')
+ self.__dtmf_file_play = varlist.get('play_dtmf_file', self.__xmlObj.getKeyVal(self.__QaudioName, "play_dtmf_file", "dtmf-13579.wav"))
+ self.__recordtime = varlist.get('recordtime', self.__xmlObj.getKeyVal(self.__QaudioName, "recordtime", "0.15"))
+ self.__dtmf_file_record = varlist.get('record_dtmf_file', self.__xmlObj.getKeyVal(self.__QaudioName, "record_dtmf_file", "record-{}.wav"))
+ self.__sampling_rate = varlist.get('rate', self.__xmlObj.getKeyVal(self.__QaudioName, "rate", "8000"))
+ self.__record_path = varlist.get('to', self.__xmlObj.getKeyVal(self.__QaudioName, "to", "/mnt/station_ramdisk"))
+ self.__run_delay = float(varlist.get('aplay_run_delay', self.__xmlObj.getKeyVal(self.__QaudioName, "aplay_run_delay", "0.0")))
+ self.__file_uuid = uuid.uuid4()
+ self.__dtmf_file_record = self.__dtmf_file_record.format(self.__file_uuid)
+
+ def restoreAlsaCtl(self):
+ try:
+ sh.alsactl('restore');
+ except Exception as E:
+ logObj.getlogger().error("Failed to Execite alsactl restore");
def execute(self):
- str_cmd = "aplay test/files/dtmf-13579.wav 2> /dev/null & arecord -r 8000 -d 1 recorded.wav 2> /dev/null" #.format(self.__dtmfFile)
- audio_loop = SysCommand("command-name", str_cmd)
- if audio_loop.execute() == 0:# BUG: Returns -1 but work
- lines = audio_loop.getOutput().splitlines()
- str_cmd = "multimon -t wav -a DTMF recorded.wav -q 2> /dev/null"
- dtmf_decoder = SysCommand("command-name", str_cmd)
- if dtmf_decoder.execute() == 0: # BUG: Returns -1 but work
- self.__raw_out = dtmf_decoder.getOutput()
- if self.__raw_out == "":
- return -1
- lines = dtmf_decoder.getOutput().splitlines()
- for i in range(0, 5):
- aux=[int(s) for s in lines[i].split() if s.isdigit()]
- self.__sum=self.__sum+aux[0]
- self.failUnless(self.__sum == self.__refSum), "failed: incorrect dtmf code" + str(self.__sum)
+ self.restoreAlsaCtl()
+ try:
+ # analize audio file
+ calcRecordTime = self.calc_audio_duration(self.__dtmf_file_play) + float(self.__recordtime) + float(0.1)
+ except TypeError as Error:
+ self.fail("failed: TypeError: {}.".Error)
+
+ # previous play to estabilise audio lines
+ p2 = sh.arecord("-r", self.__sampling_rate, "-d", calcRecordTime, "{}/{}".format(self.__record_path, self.__dtmf_file_record), _bg=True)
+ time.sleep(self.__run_delay)
+ p1 = sh.aplay(self.__dtmf_file_play)
+ p2.wait()
+
+ p2 = sh.arecord("-r", self.__sampling_rate, "-d", calcRecordTime, "{}/{}".format(self.__record_path, self.__dtmf_file_record), _bg=True)
+ time.sleep(self.__run_delay)
+ p1 = sh.aplay(self.__dtmf_file_play)
+ p2.wait()
+ if p1.exit_code == 0 and p2.exit_code == 0:
+ p = sh.multimon("-t", "wav", "-a", "DTMF", "{}/{}".format(self.__record_path, self.__dtmf_file_record), "-q")
+ if p.exit_code == 0:
+ lines = p.stdout.decode('ascii').splitlines()
+ self.__dtmf_secuence_result = []
+ for li in lines:
+ if li.split(" ")[0] == "DTMF:":
+ self.__dtmf_secuence_result.append(li.split(" ")[1])
+ self.__dtmf_secuence_result = sorted(list(set(self.__dtmf_secuence_result)))
+ # compare original and processed dtmf sequence
+ if len(self.__dtmf_secuence) == len(self.__dtmf_secuence_result):
+ for a, b in zip(self.__dtmf_secuence, self.__dtmf_secuence_result):
+ if a != b:
+ logObj.getlogger().debug(
+ "failed: sent and received DTMF sequence do not match.{} != {}".format(
+ self.__dtmf_secuence, self.__dtmf_secuence_result))
+ self.fail("failed: sent and received DTMF sequence do not match.")
+ save_result(filePath='{}/{}'.format(self.__record_path, self.__dtmf_file_record),
+ description='sound record',
+ mime='audio/x-wav',
+ result=self.__resultlist)
+ else:
+ logObj.getlogger().debug("received DTMF sequence is shorter than expected.{} {}".format(self.__dtmf_secuence,self.__dtmf_secuence_result))
+ self.fail("received DTMF sequence is shorter than expected")
else:
- self.fail("failed: fail reading recorded file")
- return -1
+ self.fail("failed: unable to use multimon command.")
else:
- self.fail("failed: fail playing/recording file")
- return -1
- return 0
+ self.fail("failed: unable to play/record audio.")
+
+ def getresults(self):
+ return self.__resultlist
+
+ def gettextresult(self):
+ return ""
+
+ def calc_audio_duration(self, fname):
+ duration = 0.0
+ with contextlib.closing(wave.open(fname, 'r')) as f:
+ frames = f.getnframes()
+ rate = f.getframerate()
+ duration = frames / float(rate)
+ f.close()
+ return duration
diff --git a/test-cli/test/tests/qbutton.py b/test-cli/test/tests/qbutton.py
deleted file mode 100644
index 72a897e..0000000
--- a/test-cli/test/tests/qbutton.py
+++ /dev/null
@@ -1,54 +0,0 @@
-from test.helpers.syscmd import SysCommand
-import unittest
-import uuid
-import time
-
-class Qbutton(unittest.TestCase):
-
- def __init__(self, testname, testfunc, gpio):
- if gpio == "SOPA":
- super(Qbutton, self).__init__("buttonSopa")
- else:
- super(Qbutton, self).__init__("buttonGpio")
- self._testMethodDoc = testname
-
- def buttonGpio(self):
- print("normal-button-test-using-gpio")
- self.fail("failed: GPIO BUTTON FAIL")
-
- def buttonSopa(self):
- str_cmd = "i2cset -f -y 1 0x2d 0x40 0x31"
- disable_pmic = SysCommand("disable_pmic", str_cmd)
- disable_pmic.execute()
- # BUG: REPEAT THIS EXECUTION TWICE BECAUSE FIRST TIME IT RETURNS AN ERROR
- led_on="echo 1 > /sys/class/leds/red\:usbhost/brightness"
- ledon = SysCommand("led_on", led_on)
- ledon.execute()
- time.sleep(0.1)
- disable_pmic.execute()
- if disable_pmic.execute() == 0:
- str_cmd = "i2cset -f -y 1 0x2d 0x50 0xff"
- reset_button = SysCommand("reset_button", str_cmd)
- if reset_button.execute() == 0:
- str_cmd = "i2cget -f -y 1 0x2d 0x50"
- get_button_val = SysCommand("get_button_val", str_cmd)
- print("\n\t --> PRESS button for 1 sec (TIMEOUT: 10s) \n")
- timeout = 0
- while timeout < 20:
- if get_button_val.execute() == 0:
- get_button_val.execute()
- button_value = get_button_val.getOutput()
- button_value=button_value.decode('ascii').split("x")
- if int(button_value[1]) == 4:
- timeout = 20
- time.sleep(0.5)
- timeout = timeout + 1
- if timeout==20 and int(button_value[1]) == 0:
- self.fail("failed: timeout exceeded")
- else:
- timeout = 20
- self.fail("failed: not button input")
- else:
- self.fail("failed: could not complete i2c reset button state")
- else:
- self.fail("failed: could not complete i2c disable PMIC")
diff --git a/test-cli/test/tests/qdmesg.py b/test-cli/test/tests/qdmesg.py
new file mode 100644
index 0000000..39a5047
--- /dev/null
+++ b/test-cli/test/tests/qdmesg.py
@@ -0,0 +1,32 @@
+import sh
+import os
+import os.path
+from os import path
+
+
+class Qdmesg:
+ params = None
+ __resultlist = None # resultlist is a python list of python dictionaries
+
+ def __init__(self, testname, testfunc, varlist):
+ self.params = varlist
+ self.__testMethodDoc = testname
+ self.__resultlist = []
+ self.pgObj = varlist["db"]
+ self.__xmlObj = varlist["xml"]
+ self.__QdmesgName = varlist.get('name', 'qdmesg')
+ self.__syslog_dmesg_file = varlist.get('syslog_dmesg',self.__xmlObj.getKeyVal(self.__QdmesgName, "syslog_dmesg",
+ "/var/log/kern.log"))
+
+ def getTestName(self):
+ return self.__testMethodDoc
+
+ def execute(self):
+ self.pgObj.run_test(self.params["testidctl"], self.params["testid"])
+ if not os.path.isfile('{}'.format(self.__syslog_dmesg_file)):
+ self.pgObj.finish_test(self.params["testidctl"], self.params["testid"], "TEST_FAILED", "")
+ return False
+ self.pgObj.upload_result_file(self.params["testidctl"], self.params["testid"], "{}".format(self.__syslog_dmesg_file),
+ "{}".format(self.__syslog_dmesg_file), "text/plain")
+ self.pgObj.finish_test(self.params["testidctl"], self.params["testid"], "TEST_COMPLETE", "")
+ return True
diff --git a/test-cli/test/tests/qduplex_ser.py b/test-cli/test/tests/qduplex_ser.py
index 98bda81..170039b 100644
--- a/test-cli/test/tests/qduplex_ser.py
+++ b/test-cli/test/tests/qduplex_ser.py
@@ -1,46 +1,79 @@
-from test.helpers.syscmd import SysCommand
import unittest
import uuid
import serial
import time
+
class Qduplex(unittest.TestCase):
+ params = None
+ __port1 = None
+ __port2 = None
+ __baudrate = None
+ __resultlist = None # resultlist is a python list of python dictionaries
- def __init__(self, testname, testfunc, port1, port2, baudrate):
+ # varlist: port1, port2, baudrate
+ def __init__(self, testname, testfunc, varlist):
+ self.params = varlist
super(Qduplex, self).__init__(testfunc)
- self.__port1 = port1
- self.__serial1 = serial.Serial(self.__port1, timeout=1)
- self.__serial1.baudrate = baudrate
-
- self.__port2 = port2
- self.__serial2 = serial.Serial(self.__port2, timeout=1)
- self.__serial2.baudrate = baudrate
+ if "port1" in varlist:
+ self.__port1 = varlist["port1"]
+ else:
+ raise Exception('port1 param inside Qduplex must be defined')
+ if "port2" in varlist:
+ self.__port2 = varlist["port2"]
+ else:
+ raise Exception('port2 param inside Qduplex must be defined')
+ if "baudrate" in varlist:
+ self.__baudrate = varlist["baudrate"]
+ else:
+ raise Exception('baudrate param inside Qduplex must be defined')
self._testMethodDoc = testname
+ self.__resultlist = []
+
+ # open serial connection
+ self.__serial1 = serial.Serial(self.__port1, self.__baudrate, timeout=1)
+ self.__serial2 = serial.Serial(self.__port2, self.__baudrate, timeout=1)
def __del__(self):
self.__serial1.close()
self.__serial2.close()
def execute(self):
+ # generate a random uuid
+ test_uuid = str(uuid.uuid4()).encode()
+
+ # clean serial ports
self.__serial1.flushInput()
self.__serial1.flushOutput()
self.__serial2.flushInput()
self.__serial2.flushOutput()
- test_uuid = str(uuid.uuid4()).encode()
+ # send the uuid through serial port
self.__serial1.write(test_uuid)
time.sleep(0.05) # there might be a small delay
if self.__serial2.inWaiting() == 0:
- self.fail("failed: PORT {} wait timeout exceded, wrong communication?".format(self.__port2))
+ self.fail("failed: port {} wait timeout exceded".format(self.__port2))
else:
- if (self.__serial2.readline() != test_uuid):
- self.fail("failed: PORT {} write/read mismatch".format(self.__port2))
+ if self.__serial2.readline() != test_uuid:
+ self.fail("failed: port {} write/read mismatch".format(self.__port2))
- test_uuid = str(uuid.uuid4()).encode()
+ time.sleep(0.05) # there might be a small delay
+
+ # clean serial ports
+ self.__serial1.flushInput()
+ self.__serial1.flushOutput()
+ self.__serial2.flushInput()
+ self.__serial2.flushOutput()
+ # send the uuid through serial port
self.__serial2.write(test_uuid)
time.sleep(0.05) # there might be a small delay
if self.__serial1.inWaiting() == 0:
- self.fail("failed: PORT {} wait timeout exceded, wrong communication?".format(self.__port1))
+ self.fail("failed: port {} wait timeout exceded".format(self.__port1))
else:
- if (self.__serial1.readline() != test_uuid):
- self.fail("failed: PORT {} write/read mismatch".format(self.__port1))
-
+ if self.__serial1.readline() != test_uuid:
+ self.fail("failed: port {} write/read mismatch".format(self.__port1))
+
+ def getresults(self):
+ return self.__resultlist
+
+ def gettextresult(self):
+ return ""
diff --git a/test-cli/test/tests/qeeprom.py b/test-cli/test/tests/qeeprom.py
index f72f78f..7900381 100644
--- a/test-cli/test/tests/qeeprom.py
+++ b/test-cli/test/tests/qeeprom.py
@@ -1,38 +1,68 @@
-from test.helpers.syscmd import SysCommand
import unittest
-import uuid
+import os
class Qeeprom(unittest.TestCase):
+ params = None
+ __resultlist = None # resultlist is a python list of python dictionaries
+ __QEepromName = None
+ __i2cBus = None
+ __device = None
- def __init__(self, testname, testfunc):
+ # varlist: position, eeprompath
+ def __init__(self, testname, testfunc, varlist):
+ self.params = varlist
super(Qeeprom, self).__init__(testfunc)
self._testMethodDoc = testname
+ self.__xmlObj = varlist["xml"]
+ self.__QEepromName = varlist.get('name', 'qeeprom')
+ self.__i2cAddress = varlist.get('i2c_address', self.__xmlObj.getKeyVal(self.__QEepromName, "i2c_address", "0050"))
+ self.__i2cBus = varlist.get('i2c_bus', self.__xmlObj.getKeyVal(self.__QEepromName, "i2c_bus", "0"))
+ self.__device = '/sys/bus/i2c/devices/{}-{}/eeprom'.format(self.__i2cBus, self.__i2cAddress)
+
+ self.__resultlist = []
+
+ def __check_device(self):
+ if os.path.isfile(self.__device):
+ return True
+ return False
+
+ def __eeprom_write(self, data):
+ try:
+ f = open(self.__device, "wb")
+ f.write(data)
+ f.close()
+ except IOError as Error:
+ return False, '{}'.format(Error.errno)
+ return True, ''
+
+ def __eeprom_read(self, size):
+ try:
+ f = open(self.__device, "rb")
+ data = f.read(size)
+ f.close()
+ except IOError as Error:
+ return False, '{}'.format(Error.errno)
+ return True, data
+
def execute(self):
- str_cmd = "find /sys/ -iname 'eeprom'"
- eeprom_location = SysCommand("eeprom_location", str_cmd)
- if eeprom_location.execute() == 0:
- self.__raw_out = eeprom_location.getOutput()
- if self.__raw_out == "":
- self.fail("Unable to get EEPROM location. IS EEPROM CONNECTED?")
- return -1
- eeprom=self.__raw_out.decode('ascii')
- test_uuid = uuid.uuid4()
- str_cmd="echo '{}' > {}".format(str(test_uuid), eeprom)
- eeprom_write = SysCommand("eeprom_write", str_cmd)
- if eeprom_write.execute() == 0:
- self.__raw_out = eeprom_write.getOutput()
- if self.__raw_out == "":
- self.fail("Unable to write on the EEPROM?")
- return -1
- str_cmd = "head -2 {}".format(eeprom)
- eeprom_read = SysCommand("eeprom_read", str_cmd)
- if eeprom_read.execute() == 0:
- self.__raw_out = eeprom_read.getOutput()
- if self.__raw_out == "":
- self.fail("Unable to read from the EEPROM?")
- return -1
- if(str(self.__raw_out).find(str(test_uuid)) == -1):
- self.fail("failed: READ/WRITE mismatch")
- else:
- self.fail("failed: could not complete find eeprom command")
+ if not self.__check_device():
+ self.fail("cannot open device {}".format(self.__device))
+
+ data = bytes('AbcDeFgHijK098521', 'utf-8')
+
+ v, w = self.__eeprom_write(data)
+ if not v:
+ self.fail("eeprom: {} write test failed".format(w))
+ v, r = self.__eeprom_read(len(data))
+ if not v:
+ self.fail("eeprom: {} read test failed".format(r))
+ if r != data:
+ self.fail("mismatch between write and read bytes")
+
+
+ def getresults(self):
+ return self.__resultlist
+
+ def gettextresult(self):
+ return ""
diff --git a/test-cli/test/tests/qethernet.py b/test-cli/test/tests/qethernet.py
index 2ac447c..7d081a1 100644
--- a/test-cli/test/tests/qethernet.py
+++ b/test-cli/test/tests/qethernet.py
@@ -1,57 +1,124 @@
-from test.helpers.syscmd import SysCommand
import unittest
-
+import sh
+import json
+import time
+import uuid
+import iperf3
+import netifaces
+from test.helpers.utils import save_json_to_disk
+from test.helpers.utils import station2Port
class Qethernet(unittest.TestCase):
- __sip = None
- __raw_out = None
- __MB_req = None
- __MB_real = None
- __BW_real = None
- __dat_list = None
- __bind = None
- __OKBW = None
-
- def __init__(self, testname, testfunc, sip = None, OKBW=100, bind=None):
+ __serverip = None
+ __numbytestx = None
+ __bwexpected = None
+ __port = None
+ params = None
+ __bwreal = None
+ __resultlist = None # resultlist is a python list of python dictionaries
+ timebetweenattempts = 5
+
+ # varlist content: serverip, bwexpected, port
+ def __init__(self, testname, testfunc, varlist):
+ self.params = varlist
super(Qethernet, self).__init__(testfunc)
- if sip is not None:
- self.__sip = sip
- if sip is not None:
- self.__bind = bind
- self.__MB_req = '10'
- self.__OKBW=OKBW
self._testMethodDoc = testname
+ self.__xmlObj = varlist["xml"]
+ self.__QEthName = varlist.get('name', 'qeth')
+ self.__loops = varlist.get('loops', self.__xmlObj.getKeyVal(self.__QEthName, "loops", "1"))
+ self.__port = varlist.get('port', self.__xmlObj.getKeyVal(self.__QEthName, "port", "5000"))
+ self.__bw = varlist.get('bwexpected', self.__xmlObj.getKeyVal(self.__QEthName, "bwexpected", "40.0"))
+ self.__serverip = varlist.get('serverip', self.__xmlObj.getKeyVal(self.__QEthName, "serverip", "localhost"))
+ self.__duration = varlist.get('duration', self.__xmlObj.getKeyVal(self.__QEthName, "duration", "10"))
+ self.__interface = varlist.get('interface', self.__xmlObj.getKeyVal(self.__QEthName, "interface", "eth0"))
+ self.__toPath = varlist.get('to', self.__xmlObj.getKeyVal(self.__QEthName, "to", "/mnt/station_ramdisk"))
+ self.__retriesCount = varlist.get('retries', self.__xmlObj.getKeyVal(self.__QEthName, "retries", "5"))
+ self.__waitRetryTime = varlist.get('wait_retry', self.__xmlObj.getKeyVal(self.__QEthName, "wait_retry", "10"))
+ self.__wifi_res_file = varlist.get('eth_res_file', self.__xmlObj.getKeyVal(self.__QEthName, "eth_res_file", "eth_test_{}.json"))
+ self.__wifi_st_file = varlist.get('eth_st_file', self.__xmlObj.getKeyVal(self.__QEthName, "eth_st_file", "eth_st_{}.json"))
+ self.__file_uuid = uuid.uuid4()
+ self.__wifi_res_file = self.__wifi_res_file.format(self.__file_uuid)
+ self.__wifi_st_file = self.__wifi_st_file.format(self.__file_uuid)
+
+ self.__resultlist = []
+
+ def checkInterface(self, reqInterface):
+ ifaces = netifaces.interfaces()
+ for t in ifaces:
+ if t == reqInterface:
+ return True
+ return False
def execute(self):
- print
- if self.__bind is None:
- str_cmd = "iperf -c {} -x CMSV -n {}M".format(self.__sip, self.__MB_req)
- else:
- str_cmd = "iperf -c {} -x CMSV -n {}M -B {}".format(self.__sip, self.__MB_req, self.__bind)
- iperf_command = SysCommand("iperf", str_cmd)
- if iperf_command.execute() == 0:
- self.__raw_out = iperf_command.getOutput()
- if self.__raw_out == "":
- return -1
- lines = iperf_command.getOutput().splitlines()
- dat = lines[1]
- dat = dat.decode('ascii')
- dat_list = dat.split( )
- for d in dat_list:
- a = dat_list.pop(0)
- if a == "sec":
- break
- self.__MB_real = dat_list[0]
- self.__BW_real = dat_list[2]
- self.__dat_list = dat_list
- #print(self.__MB_real)
- #print(self.__BW_real)
- self.failUnless(float(self.__BW_real)>float(self.__OKBW)*0.9,"failed: speed is lower than spected. Speed(MB/s)" + str(self.__BW_real))
+ self.__resultlist = []
+ # check interfaces
+ if not self.checkInterface(self.__interface):
+ self.fail('Interface not found: {}'.format(self.__interface))
+
+ # Start Iperf
+ client = iperf3.Client()
+ client.duration = int(self.__duration)
+ client.server_hostname = self.__serverip
+ client.port = station2Port(self.__port)
+ count = int(self.__retriesCount)
+ while count > 0:
+ result = client.run()
+ if result.error == None:
+ if result.received_Mbps >= float(self.__bw) and result.sent_Mbps >= float(self.__bw):
+ save_json_to_disk(filePath='{}/{}'.format(self.__toPath, self.__wifi_res_file),
+ description='eth {} iperf3'.format(self.__interface),
+ mime='application/json',
+ json_data=result.json,
+ result=self.__resultlist)
+ return
+ else:
+ self.fail('bw fail: rx:{} tx:{}'.format(result.received_Mbps, result.sent_Mbps))
+ else:
+ count = count - 1
+ time.sleep(int(self.__waitRetryTime))
+
+ self.fail('qEth (max tries) Execution error: {}'.format(result.error))
+
+ def execute2(self):
+ # execute iperf command against the server, but it implements attempts in case the server is busy
+ iperfdone = False
+ while not iperfdone:
+ try:
+ p = sh.iperf3("-c", self.__serverip, "-n", self.__numbytestx, "-f", "m", "-p", self.__port, "-J",
+ _timeout=20)
+ iperfdone = True
+ except sh.TimeoutException:
+ self.fail("failed: iperf timeout reached")
+ except sh.ErrorReturnCode:
+ time.sleep(self.timebetweenattempts)
+
+ # check if it was executed succesfully
+ if p.exit_code == 0:
+ if p.stdout == "":
+ self.fail("failed: error executing iperf command")
+ # analyze output string
+ data = json.loads(p.stdout.decode('ascii'))
+ self.__bwreal = float(data['end']['sum_received']['bits_per_second'])/1024/1024
+ # save result file
+ with open('/mnt/station_ramdisk/ethernet-iperf3.json', 'w') as outfile:
+ json.dump(data, outfile, indent=4)
+ outfile.close()
+ self.__resultlist.append(
+ {
+ "description": "iperf3 output",
+ "filepath": "/mnt/station_ramdisk/ethernet-iperf3.json",
+ "mimetype": "application/json"
+ }
+ )
+
+ # check if BW is in the expected range
+ if self.__bwreal < float(self.__bwexpected):
+ self.fail("failed: speed is lower than spected. Speed(Mbits/s): " + str(self.__bwreal))
else:
- self.fail("failed: could not complete iperf command")
+ self.fail("failed: could not complete iperf command.")
- def get_Total_MB(self):
- return self.__MB_real;
+ def getresults(self):
+ return self.__resultlist
- def get_Total_BW(self):
- return self.__MB_real;
+ def gettextresult(self):
+ return ""
diff --git a/test-cli/test/tests/qflash.py b/test-cli/test/tests/qflash.py
deleted file mode 100644
index cdc4109..0000000
--- a/test-cli/test/tests/qflash.py
+++ /dev/null
@@ -1,77 +0,0 @@
-from test.helpers.syscmd import SysCommand
-import unittest
-from test.helpers.globalVariables import globalVar
-
-class Qflasher(unittest.TestCase):
-
- def __init__(self, testname, testfunc):
- super(Qflasher, self).__init__(testfunc)
- self._testMethodDoc = testname
- model=globalVar.g_mid
- if model.find("IGEP0046") == 0:
- self._flash_method = "mx6"
- self._binlocation="/boot/"
- elif model.find("IGEP0000") == 0:
- self._flash_method = "mx6"
- self._binlocation="/boot/"
- elif model.find("IGEP0034") == 0:
- self._flash_method = "nandti"
- self._binlocation = "/boot/"
- elif model.find("SOPA") == 0:
- self._flash_method = "sopaflash"
- elif model.find("OMAP3") == 0:
- self._flash_method = "nandti"
- self._binlocation = "/boot/"
- elif model.find("OMAP5") == 0:
- self._flash_method = "nandti"
- self._binlocation = "/boot/"
-
-
- def execute(self):
- # CASE eMMC boards.
- if(self._flash_method == "mx6"):
- str_cmd= "dd if={}u-boot.imx of=/dev/mmcblk2 bs=512 seek=2 2>&1 >/dev/null".format(self._binlocation)
- flash_command = SysCommand("flash_command", str_cmd)
- if flash_command.execute() == 0:
- str_cmd = "sync"
- sync_command = SysCommand("sync_command", str_cmd)
- if sync_command.execute() != 0:
- self.fail("failed: could not sync")
- else:
- self.fail("failed: could not complete flash eMMC commands")
- # CASE of nandflash boards
- elif(self._flash_method == "nandti"):
- str_cmd = "nandwrite -p -s 0x0 /dev/mtd0 {}u-boot.img 2>&1 >/dev/null; " \
- "nandwrite -p -s 0x20000 /dev/mtd0 {}u-boot.img 2>&1 >/dev/null; " \
- "nandwrite -p -s 0x40000 /dev/mtd0 {}u-boot.img 2>&1 >/dev/null; " \
- "nandwrite -p -s 0x60000 /dev/mtd0 {}u-boot.img 2>&1 >/dev/null; " \
- "".format(self._binlocation, self._binlocation, self._binlocation, self._binlocation,)
- flash_MLO_command = SysCommand("flash_MLO_command", str_cmd)
- if flash_MLO_command.execute() == 0:
- str_cmd= "nandwrite -p /dev/mtd1 {}u-boot.img 2>&1 >/dev/null".format(self._binlocation)
- flash_uBoot_command = SysCommand("flash_command", str_cmd)
- if flash_uBoot_command.execute() == 0:
- str_cmd = "sync"
- sync_command = SysCommand("sync_command", str_cmd)
- if sync_command.execute() != 0:
- self.fail("failed: could not sync")
- else:
- self.fail("failed: could not complete flash u-boot commnds")
- else:
- self.fail("failed: could not complete flash MLO commnd")
-
-
- # CASE of SOPA0000 BOARD. FULL FLASH (u-boot+kernel+rootfs)
- elif (self._flash_method == "sopaflash"):
- str_cmd = "nandtest /dev/mtd0 >/dev/null"
- flash_command = SysCommand("flash_command", str_cmd)
- if flash_command.execute() == 0:
- str_cmd = "/usr/bin/igep-flash --skip-nandtest --image /opt/firmware/demo-ti-image-*-*.tar* >/dev/null 2>&1"
- sync_command = SysCommand("sync_command", str_cmd)
- if sync_command.execute() != 0:
- self.fail("failed: could not complete flashing procces")
- else:
- self.fail("failed: could not complete nandtest mtd0")
-
- else:
- self.fail("failed: could not find flash method")
diff --git a/test-cli/test/tests/qi2c.py b/test-cli/test/tests/qi2c.py
deleted file mode 100644
index 409005c..0000000
--- a/test-cli/test/tests/qi2c.py
+++ /dev/null
@@ -1,29 +0,0 @@
-from test.helpers.syscmd import SysCommand
-import unittest
-
-class Qi2c(unittest.TestCase):
-
- def __init__(self, testname, testfunc, busnum, register):
- super(Qi2c, self).__init__(testfunc)
- self.__busnum = busnum
- self.__register = register.split("/")
- self.__devices=[]
- self._testMethodDoc = testname
-
- def execute(self):
- str_cmd= "i2cdetect -a -y -r {}".format(self.__busnum)
- i2c_command = SysCommand("i2cdetect", str_cmd)
- if i2c_command.execute() == 0:
- self.__raw_out = i2c_command.getOutput()
- if self.__raw_out == "":
- return -1
- lines=self.__raw_out.decode('ascii').splitlines()
- for i in range(len(lines)):
- if (lines[i].count('UU')):
- if (lines[i].find("UU")):
- self.__devices.append("0x{}{}".format((i - 1), hex(int((lines[i].find("UU") - 4) / 3)).split('x')[-1]))
- for i in range(len(self.__register)):
- if not(self.__register[i] in self.__devices):
- self.fail("failed: device {} not found in bus i2c-{}".format(self.__register[i], self.__busnum))
- else:
- self.fail("failed: could not complete i2cdedtect command")
diff --git a/test-cli/test/tests/qiperf.py b/test-cli/test/tests/qiperf.py
deleted file mode 100644
index 126b6ee..0000000
--- a/test-cli/test/tests/qiperf.py
+++ /dev/null
@@ -1,53 +0,0 @@
-from test.helpers.syscmd import SysCommand
-
-
-class QIperf(object):
- __sip = None
- __raw_out = None
- __MB_req = None
- __MB_real = None
- __BW_real = None
- __dat_list = None
- __bind = None
-
- def __init__(self, sip = None):
- if sip is not None:
- self.__sip = sip
- self.__MB_req = '10'
-
- def execute(self, sip = None, bind = None):
- if sip is not None:
- self.__sip = sip
-
- if bind is None:
- str_cmd = "iperf -c {} -x CMSV -n {}M".format(self.__sip, self.__MB_req)
- else:
- self.__bind = bind
- str_cmd = "iperf -c {} -x CMSV -n {}M -B {}".format(self.__sip, self.__MB_req, self.__bind)
- t = SysCommand("iperf", str_cmd)
- if t.execute() == 0:
- self.__raw_out = t.getOutput()
- if self.__raw_out == "":
- return -1
- lines = t.getOutput().splitlines()
- dat = lines[1]
- dat = dat.decode('ascii')
- dat_list = dat.split( )
- for d in dat_list:
- a = dat_list.pop(0)
- if a == "sec":
- break
- self.__MB_real = dat_list[0]
- self.__BW_real = dat_list[2]
- self.__dat_list = dat_list
- print(self.__MB_real)
- print(self.__BW_real)
- else:
- return -1
- return 0
-
- def get_Total_MB(self):
- return self.__MB_real;
-
- def get_Total_BW(self):
- return self.__MB_real;
diff --git a/test-cli/test/tests/qmmcflash.py b/test-cli/test/tests/qmmcflash.py
new file mode 100644
index 0000000..0f5a0c1
--- /dev/null
+++ b/test-cli/test/tests/qmmcflash.py
@@ -0,0 +1,55 @@
+import unittest
+import scanf
+from scanf import scanf
+from sh import mmc
+from sh import ErrorReturnCode
+from test.helpers.utils import save_file_to_disk
+from test.helpers.utils import sys_read
+
+class Qmmcflash(unittest.TestCase):
+ params = None
+
+ def __init__(self, testname, testfunc, varlist):
+ self.params = varlist
+ super(Qmmcflash, self).__init__(testfunc)
+ self._testMethodDoc = testname
+ self.__xmlObj = varlist["xml"]
+
+ self.__QeMMCName = varlist.get('name', 'qemmc')
+ self.__emmcDevice = varlist.get('device', self.__xmlObj.getKeyVal(self.__QeMMCName, "device", "mmcblk0"))
+ self.__toPath = varlist.get('to', self.__xmlObj.getKeyVal(self.__QeMMCName, "to", "/mnt/station_ramdisk"))
+ self.__mmc_res_file = varlist.get('emmc_res_file', self.__xmlObj.getKeyVal(self.__QeMMCName, "emmc_res_file", "emmc_status.txt"))
+ self.__mmcPort = varlist.get('mmc_port', self.__xmlObj.getKeyVal(self.__QeMMCName, "mmc_port", "0"))
+ self.__mmcID = varlist.get('mmc_id', self.__xmlObj.getKeyVal(self.__QeMMCName, "mmc_id", "0001"))
+
+ self.__resultlist = []
+
+ def execute(self):
+ try:
+ dataOut = mmc('extcsd', 'read', '/dev/{}'.format(self.__emmcDevice))
+ save_file_to_disk(filePath='{}/{}'.format(self.__toPath, self.__mmc_res_file),
+ description='eMMC health test',
+ mime='text/plain',
+ data=dataOut.stdout.decode('utf-8'),
+ result=self.__resultlist)
+ sysDevice = "/sys/bus/mmc/drivers/mmcblk/mmc{}:{}".format(self.__mmcPort, self.__mmcID)
+ r, data = sys_read("{}/life_time".format(sysDevice))
+ if not r:
+ self.fail("emmc: life_time not found")
+ res = scanf("0x%d 0x%d", data)
+ if res[0] > 3 or res[1] > 3:
+ self.fail("emmc: review {} life_time > 3".format(sysDevice))
+ r, data = sys_read("{}/pre_eol_info".format(sysDevice))
+ if not r:
+ self.fail("emmc: pre_eol_info not found")
+ res = scanf("0x%d", data)
+ if res[0] != 1:
+ self.fail("emmc: review {} pre_eol_info != 1".format(sysDevice))
+ except ErrorReturnCode as Error:
+ self.fail("emmc: failed {} ".format(Error.exit_code))
+
+ def getresults(self):
+ return self.__resultlist
+
+ def gettextresult(self):
+ return "" \ No newline at end of file
diff --git a/test-cli/test/tests/qnand.py b/test-cli/test/tests/qnand.py
new file mode 100644
index 0000000..6de1eaf
--- /dev/null
+++ b/test-cli/test/tests/qnand.py
@@ -0,0 +1,48 @@
+import unittest
+import sh
+import uuid
+from test.helpers.utils import save_file_to_disk
+
+class Qnand(unittest.TestCase):
+ params = None
+ __device = "10M"
+ __resultlist = None # resultlist is a python list of python dictionaries
+
+ # varlist: device
+ def __init__(self, testname, testfunc, varlist):
+ self.params = varlist
+ super(Qnand, self).__init__(testfunc)
+ self._testMethodDoc = testname
+ self.__xmlObj = varlist["xml"]
+ self.__QNandName = varlist.get('name', 'qnand')
+ self.__device = varlist.get('device', self.__xmlObj.getKeyVal(self.__QNandName, "device", "/dev/mtd0"))
+ self.__nand_res_file = varlist.get('nand_res_file', self.__xmlObj.getKeyVal(self.__QNandName, "nand_res_file", "nand_test_{}.txt"))
+ self.__toPath = varlist.get('to', self.__xmlObj.getKeyVal(self.__QNandName, "to", "/mnt/station_ramdisk"))
+ self.__file_uuid = uuid.uuid4()
+ self.__nand_res_file = self.__nand_res_file.format(self.__file_uuid)
+ nandtestPath = self.__xmlObj.getKeyVal("qnand", "nandtestPath", '/root/hwtest-files/nandtest"')
+
+ if nandtestPath == '':
+ self.myNandTest = sh.nandtest
+ else:
+ self.myNandTest = sh.Command(nandtestPath)
+ self.__resultlist = []
+
+ def execute(self):
+ try:
+ res = self.myNandTest("-m", self.__device)
+ save_file_to_disk(filePath='{}/{}'.format(self.__toPath, self.__nand_res_file),
+ description='nand {} test'.format(self.__device),
+ mime='text/plain',
+ data=res.stdout.decode('utf-8'),
+ result=self.__resultlist)
+
+ except sh.ErrorReturnCode_1 as e:
+ self.fail("nandtest failed: {}".format(e.ErrorReturnCode.stdout))
+
+
+ def getresults(self):
+ return self.__resultlist
+
+ def gettextresult(self):
+ return ""
diff --git a/test-cli/test/tests/qplc.py b/test-cli/test/tests/qplc.py
new file mode 100644
index 0000000..28310c6
--- /dev/null
+++ b/test-cli/test/tests/qplc.py
@@ -0,0 +1,121 @@
+import shutil
+import unittest
+import os.path
+import os
+import sh
+import stat
+import time
+import ping3
+from netifaces import AF_INET
+import netifaces as ni
+
+from test.helpers.md5 import md5_file
+from test.helpers.plc import dcpPLC
+from test.helpers.iseelogger import logObj
+
+class Qplc(unittest.TestCase):
+ params = None
+ __resultlist = None # resultlist is a python list of python dictionaries
+ __QPLCName = None
+ __plc = None
+ __board_uuid = None
+
+ def __init__(self, testname, testfunc, varlist):
+ self.params = varlist
+ super(Qplc, self).__init__(testfunc)
+ self._testMethodDoc = testname
+ self.__xmlObj = varlist["xml"]
+ self.__psdbObj = varlist["db"]
+ self.__board_uuid = varlist["boarduuid"]
+
+ self.__QPLCName = varlist.get('name', 'qplc')
+ self.__firmware = varlist.get('firmware', self.__xmlObj.getKeyVal(self.__QPLCName, "firmware", ""))
+ self.__firmware_md5 = varlist.get('firmware_md5', self.__xmlObj.getKeyVal(self.__QPLCName, "firmware_md5", ""))
+ self.__factoryTool = varlist.get('factory_tool', self.__xmlObj.getKeyVal(self.__QPLCName, "factory_tool", ""))
+ self.__gen_ip = varlist.get('gen_ip', self.__xmlObj.getKeyVal(self.__QPLCName, "gen_ip", "0"))
+ self.__gen_mac = varlist.get('gen_mac', self.__xmlObj.getKeyVal(self.__QPLCName, "gen_mac", "0"))
+ self.__mtd_device = varlist.get('mtd_device', self.__xmlObj.getKeyVal(self.__QPLCName, "mtd_device", "/dev/mtd0"))
+ self.__firmware_Path = varlist.get('firmwarepath', self.__xmlObj.getKeyVal(self.__QPLCName, "firmwarepath", "/root/hwtest-files/firmware"))
+ self.__skipflash = varlist.get('skipflash', self.__xmlObj.getKeyVal(self.__QPLCName, "skipflash", "0"))
+ self.__plc = dcpPLC(self.__factoryTool, self.__mtd_device)
+ self.__factory_password = varlist.get('factory_password', self.__xmlObj.getKeyVal(self.__QPLCName, "factory_password", "0"))
+ self.__firmware_password = varlist.get('firmware_password', self.__xmlObj.getKeyVal(self.__QPLCName, "firmware_password", "0"))
+ self.__plc_reset_wait = varlist.get('plc_reset_wait', self.__xmlObj.getKeyVal(self.__QPLCName, "plc_reset_wait", "60"))
+ self.__resultlist = []
+
+
+ def checkfiles(self):
+ if not os.path.isfile(self.__factoryTool):
+ return False, 'Factory tool Not found {}/{}'.format(self.__factoryTool)
+ if not os.path.isfile('{}/{}'.format(self.__firmware_Path, self.__firmware)):
+ return False, 'Firmware Not found {}/{}'.format(self.__firmware_Path, self.__firmware)
+ if not os.path.isfile('{}/{}'.format(self.__firmware_Path, self.__firmware_md5)):
+ return False, 'Firmware md5 Not found {}/{}'.format(self.__firmware_Path, self.__firmware_md5)
+ try:
+ mode = os.stat(self.__mtd_device).st_mode;
+ if not stat.S_ISCHR(mode):
+ return False, 'No device {} found'.format(self.__mtd_device)
+ except:
+ return False, 'No device {} found'.format(self.__mtd_device)
+ return True, ''
+
+ def __flashMemory(self):
+ r, v = self.__plc.SaveFirmware('{}/{}'.format(self.__firmware_Path, self.__firmware))
+ if not r:
+ self.fail(v)
+
+ def execute(self):
+ # Check files
+ v, m = self.checkfiles()
+ if not v:
+ self.fail(m)
+ # do flash & verify
+ if self.__skipflash == '0':
+ self.__flashMemory()
+ # do Plc boot
+ self.__plc.setBootMode()
+ # Wait plc goes UP
+ if self.__gen_mac:
+ res, v = self.__psdbObj.get_plc_macaddr(self.__board_uuid)
+ if res == None and v == 'MAC limit by uuid':
+ self.fail('MAC limit by uuid')
+ elif res == None:
+ self.fail('BUG: ?????')
+ logObj.getlogger().info("MAC {}".format(res[0]))
+ plcMAC = res[0][0]
+ # wait for PLC to be turned on
+ attemptcounter = 0
+ plcfound = False
+ while attemptcounter < 5 and not plcfound:
+ if self.__plc.discover():
+ plcfound = True
+ else:
+ attemptcounter += 1
+ time.sleep(5)
+ # mandatory wait before configuring the PLC chip
+ time.sleep(2)
+ self.__plc.set_plc2('SYSTEM.PRODUCTION.SECTOR0_UNLOCK_PASSWORD',
+ self.__factory_password,
+ 'SYSTEM.PRODUCTION.MAC_ADDR',
+ '{}'.format(plcMAC),
+ self.__firmware_password)
+ # plc reset to save changes
+ self.__plc.setPLCReset()
+ time.sleep(int(self.__plc_reset_wait))
+ # calculate the IP of the GPLC0000 board to ping
+ ip_eth01 = ni.ifaddresses('eth0:1')[AF_INET][0]['addr'] # plc_test_ip = 10.10.1.113
+ ip_eth01_splited = ip_eth01.split('.')
+ ip_eth01_splited[3] = str(int(ip_eth01_splited[3]) + 100)
+ plc_test_ip = ".".join(ip_eth01_splited) # plc_test_ip = 10.10.1.213
+ # ping against the GPLC0000 board
+ ping3.EXCEPTIONS = True
+ try:
+ ping3.ping(plc_test_ip, timeout=0.1)
+ except:
+ self.fail('PLC ping timeout')
+
+ def getresults(self):
+ return self.__resultlist
+
+ def gettextresult(self):
+ return ""
diff --git a/test-cli/test/tests/qram.py b/test-cli/test/tests/qram.py
index 8ec0210..49d4d54 100644
--- a/test-cli/test/tests/qram.py
+++ b/test-cli/test/tests/qram.py
@@ -1,22 +1,62 @@
-from test.helpers.syscmd import SysCommand
import unittest
+import sh
+# from test.helpers.iseelogger import MeasureTime
+# from test.helpers.iseelogger import logObj
+from test.helpers.utils import save_file_to_disk
class Qram(unittest.TestCase):
+ params = None
+ __memsize = None
+ __loops = None
+ __resultlist = [] # resultlist is a python list of python dictionaries
- def __init__(self, testname, testfunc, memSize):
+ # varlist: memsize, loops
+ def __init__(self, testname, testfunc, varlist):
+ self.params = varlist
super(Qram, self).__init__(testfunc)
- self.__memSize = memSize
self._testMethodDoc = testname
+ self.__xmlObj = varlist["xml"]
- def execute(self):
- str_cmd= "free -m"
- free_command = SysCommand("free_ram", str_cmd)
- if free_command.execute() == 0:
- self.__raw_out = free_command.getOutput()
- if self.__raw_out == "":
- return -1
- lines = free_command.getOutput().splitlines()
- aux = [int(s) for s in lines[1].split() if s.isdigit()]
- self.failUnless(int(aux[0])>int(self.__memSize),"failed: total ram memory size lower than expected")
+ self.__QramName = varlist.get('name', 'qram')
+ self.__loops = varlist.get('loops', self.__xmlObj.getKeyVal(self.__QramName, "loops", "1"))
+ self.__memsize = varlist.get('memsize', self.__xmlObj.getKeyVal(self.__QramName, "memsize", "50M"))
+ self.__toPath = varlist.get('to', self.__xmlObj.getKeyVal(self.__QramName, "to", "/mnt/station_ramdisk"))
+ self.__ram_res_file = varlist.get('ram_res_file', self.__xmlObj.getKeyVal(self.__QramName, "ram_res_file", "ram_res.txt"))
+
+ self.__dummytest = varlist.get('dummytest', self.__xmlObj.getKeyVal(self.__QramName, "dummytest", "0"))
+ self.__dummyresult = varlist.get('dummytestresult', self.__xmlObj.getKeyVal(self.__QramName, "dummytest", "0"))
+
+ memtesterPath = self.__xmlObj.getKeyVal(self.__QramName, "memtesterPath", '')
+
+ if memtesterPath == '':
+ self.myMemtester = sh.memtester
else:
- self.fail("failed: could not complete iperf command")
+ self.myMemtester = sh.Command(memtesterPath)
+
+ def execute(self):
+ self.__resultlist = []
+ try:
+ # mytime = MeasureTime()
+ res = self.myMemtester(self.__memsize, '{}'.format(self.__loops))
+ save_file_to_disk(filePath='{}/{}'.format(self.__toPath, self.__ram_res_file),
+ description='Ram result test',
+ mime='text/plain',
+ data=res.stdout.decode('utf-8'),
+ result=self.__resultlist)
+ # mytime.stop()
+ except sh.ErrorReturnCode as e:
+ save_file_to_disk(filePath='{}/{}'.format(self.__toPath, self.__ram_res_file),
+ description='Ram result test',
+ mime='text/plain',
+ data=e.stdout.decode('utf-8'),
+ result=self.__resultlist)
+ self.fail("failed: memtester {}::{}".format(str(e.exit_code), e.stdout.decode('utf-8')))
+
+ except Exception as details:
+ self.fail('Error: {}'.format(details))
+
+ def getresults(self):
+ return self.__resultlist
+
+ def gettextresult(self):
+ return ""
diff --git a/test-cli/test/tests/qrtc.py b/test-cli/test/tests/qrtc.py
index 1d02f78..df493d2 100644
--- a/test-cli/test/tests/qrtc.py
+++ b/test-cli/test/tests/qrtc.py
@@ -1,28 +1,46 @@
-from test.helpers.syscmd import SysCommand
+import sh
import unittest
import time
+import re
+
class Qrtc(unittest.TestCase):
+ params = None
+ __rtc = None
+ __resultlist = None # resultlist is a python list of python dictionaries
- def __init__(self, testname, testfunc, rtc):
+ def __init__(self, testname, testfunc, varlist):
+ self.params = varlist
super(Qrtc, self).__init__(testfunc)
- self.__rtc = rtc
+ if "rtc" in varlist:
+ self.__rtc = varlist["rtc"]
+ else:
+ raise Exception('rtc param inside Qrtc must be defined')
self._testMethodDoc = testname
+ self.__resultlist = []
def execute(self):
- str_cmd = "hwclock -f {}".format(self.__rtc)
- rtc_set = SysCommand("rtc_set", str_cmd)
- if rtc_set.execute() == 0:
- curr_hour = rtc_set.getOutput().decode('ascii').split(" ")
- time1 = int((curr_hour[4].split(":"))[2])
+ # get time from RTC peripheral
+ p = sh.hwclock("-f", self.__rtc)
+ if p.exit_code == 0:
+ time1 = re.search("([01]?[0-9]{1}|2[0-3]{1})[:][0-5]{1}[0-9]{1}[:][0-5]{1}[0-9]{1}",
+ p.stdout.decode('ascii'))
time.sleep(1)
- if rtc_set.execute() == 0:
- curr_hour = rtc_set.getOutput().decode('ascii').split(" ")
- time2 = int((curr_hour[4].split(":"))[2])
- if time1==time2:
+ # get time from RTC another time
+ p = sh.hwclock("-f", self.__rtc)
+ if p.exit_code == 0:
+ time2 = re.search("([01]?[0-9]{1}|2[0-3]{1})[:][0-5]{1}[0-9]{1}[:][0-5]{1}[0-9]{1}",
+ p.stdout.decode('ascii'))
+ # check if the seconds of both times are different
+ if time1 == time2:
self.fail("failed: RTC is not running")
else:
- self.fail("failed: couldn't execute hwclock command 2nd time")
+ self.fail("failed: couldn't execute hwclock command for the second time")
else:
self.fail("failed: couldn't execute hwclock command")
+ def getresults(self):
+ return self.__resultlist
+
+ def gettextresult(self):
+ return ""
diff --git a/test-cli/test/tests/qscreen.py b/test-cli/test/tests/qscreen.py
deleted file mode 100644
index b8a5a48..0000000
--- a/test-cli/test/tests/qscreen.py
+++ /dev/null
@@ -1,31 +0,0 @@
-from test.helpers.syscmd import SysCommand
-import unittest
-import time
-from test.helpers.cv_display_test import pattern_detect
-
-class Qscreen(unittest.TestCase):
-
- def __init__(self, testname, testfunc, display):
- super(Qscreen, self).__init__(testfunc)
- self.__display = display
- self._testMethodDoc = testname
-
- def execute(self):
- str_cmd = "fbi -T 1 --noverbose -d /dev/{} test/files/test_pattern.png".format(self.__display)
- display_image = SysCommand("display_image", str_cmd)
- if display_image.execute() == -1:
- test_screen = pattern_detect(1)
- if not test_screen=="0":
- self.fail("failed: {}".format(test_screen))
- str_cmd= "fbi -T 1 /home/root/result_hdmi_img.jpg -d /dev/fb0 --noverbose -a"
- show_img = SysCommand("show-image", str_cmd)
- show_img.execute()
- else:
- self.fail("failed: could not display the image")
- try:
- str_cmd= "fbi -T 1 /home/root/result_hdmi_img.jpg -d /dev/fb0 --noverbose -a"
- show_img = SysCommand("show-image", str_cmd)
- show_img.execute()
- except ValueError:
- print("COULD NOT DISPLAY IMAGE")
-
diff --git a/test-cli/test/tests/qserial.py b/test-cli/test/tests/qserial.py
index 43ba3c8..402e33f 100644
--- a/test-cli/test/tests/qserial.py
+++ b/test-cli/test/tests/qserial.py
@@ -1,30 +1,55 @@
-from test.helpers.syscmd import SysCommand
import unittest
import uuid
import serial
import time
+
class Qserial(unittest.TestCase):
+ params = None
+ __port = None
+ __baudrate = None
+ __resultlist = None # resultlist is a python list of python dictionaries
- def __init__(self, testname, testfunc, port, baudrate):
+ # varlist: port, baudrate
+ def __init__(self, testname, testfunc, varlist):
+ self.params = varlist
super(Qserial, self).__init__(testfunc)
- self.__port = port
- self.__serial = serial.Serial(self.__port, timeout=1)
- self.__serial.baudrate = baudrate
+ if "port" in varlist:
+ self.__port = varlist["port"]
+ else:
+ raise Exception('port param inside Qserial must be defined')
+
+ if "baudrate" in varlist:
+ self.__baudrate = varlist["baudrate"]
+ else:
+ raise Exception('baudrate param inside Qserial must be defined')
self._testMethodDoc = testname
+ self.__resultlist = []
+
+ # open serial connection
+ self.__serial = serial.Serial(self.__port, self.__baudrate, timeout=1)
def __del__(self):
self.__serial.close()
def execute(self):
+ # generate a random uuid
+ test_uuid = str(uuid.uuid4()).encode()
+ # clean serial port
self.__serial.flushInput()
self.__serial.flushOutput()
- test_uuid = str(uuid.uuid4()).encode()
+ # send the uuid through serial port
self.__serial.write(test_uuid)
time.sleep(0.05) # there might be a small delay
if self.__serial.inWaiting() == 0:
- self.fail("failed: PORT {} wait timeout exceded, wrong communication?".format(self.__port))
+ self.fail("failed: port {} wait timeout exceded".format(self.__port))
else:
- if (self.__serial.readline() != test_uuid):
- self.fail("failed: PORT {} write/read mismatch".format(self.__port))
+ # check if what it was sent is equal to what has been received
+ if self.__serial.readline() != test_uuid:
+ self.fail("failed: port {} write/read mismatch".format(self.__port))
+
+ def getresults(self):
+ return self.__resultlist
+ def gettextresult(self):
+ return ""
diff --git a/test-cli/test/tests/qtemplate.py b/test-cli/test/tests/qtemplate.py
deleted file mode 100644
index 940cded..0000000
--- a/test-cli/test/tests/qtemplate.py
+++ /dev/null
@@ -1,51 +0,0 @@
-#IF COMMAND IS NEEDED
-from test.helpers.syscmd import SysCommand
-import unittest
-#class name
-class Qtemplate(unittest.TestCase):
- # Initialize the variables
- __variable1 = "Value-a"
- __variable2 = "Value-b"
- #....
- __variablen = "Value-n"
-
- def __init__(self, testname, testfunc, input1=None, inputn=None):
- # Doing this we will initialize the class and later on perform a particular method inside this class
- super(Qtemplate, self).__init__(testfunc)
- self.__testname = testname
- self.__input1 = input1
- self.__inputn = inputn
- self._testMethodDoc = testname
-
-
-
- def execute(self):
- str_cmd = "command"
- t = SysCommand("command-name", str_cmd)
- if t.execute() == 0:
- self.__raw_out = t.getOutput()
- if self.__raw_out == "":
- return -1
- lines = t.getOutput().splitlines()
- dat = lines[1]
- dat = dat.decode('ascii')
- dat_list = dat.split( )
- for d in dat_list:
- a = dat_list.pop(0)
- if a == "sec":
- break
- self.__MB_real = dat_list[0]
- self.__BW_real = dat_list[2]
- self.__dat_list = dat_list
- print(self.__MB_real)
- print(self.__BW_real)
- self.failUnless(int(self.__BW_real)>int(self.__OKBW)*0.9,"FAIL:BECAUSE...")
- else:
- return -1
- return 0
-
- def get_Total_MB(self):
- return self.__MB_real;
-
- def get_Total_BW(self):
- return self.__MB_real;
diff --git a/test-cli/test/tests/qusb.py b/test-cli/test/tests/qusb.py
index 44490bc..4f76d0c 100644
--- a/test-cli/test/tests/qusb.py
+++ b/test-cli/test/tests/qusb.py
@@ -1,60 +1,95 @@
-from test.helpers.syscmd import SysCommand
+import shutil
import unittest
+import os.path
+import os
+import sh
+
+from test.helpers.md5 import md5_file
class Qusb(unittest.TestCase):
+ params = None
+ __resultlist = None # resultlist is a python list of python dictionaries
+ __QusbName = None
- def __init__(self, testname, testfunc, devLabel, numPorts):
+ def __init__(self, testname, testfunc, varlist):
+ self.params = varlist
super(Qusb, self).__init__(testfunc)
- self.__numPorts = numPorts
self._testMethodDoc = testname
- self.__devLabel = devLabel
- if testname=="USBOTG":
- self.__usbFileName = "/this_is_an_usb_otg"
- self.__usbtext = "USBOTG"
- elif testname=="SATA":
- self.__usbFileName = "/this_is_a_sata"
- self.__usbtext = "SATA"
- else:
- self.__usbFileName = "/this_is_an_usb_host"
- self.__usbtext = "USBHOST"
- self.__numUsbFail=[]
+ self.__xmlObj = varlist["xml"]
+
+ self.__QusbName = varlist.get('name', 'qusb')
+ self.__loops = varlist.get('loops', self.__xmlObj.getKeyVal(self.__QusbName, "loops", "3"))
+ self.__filepath_from = varlist.get('path', self.__xmlObj.getKeyVal(self.__QusbName, "path", "/root/hwtest-files"))
+ self.__file_source = varlist.get('file', self.__xmlObj.getKeyVal(self.__QusbName, "file", "usb-test.bin"))
+ self.__file_md5 = varlist.get('file_md5', self.__xmlObj.getKeyVal(self.__QusbName, "file_md5", "usb-test.bin.md5"))
+ self.__path_to = varlist.get('pathto', self.__xmlObj.getKeyVal(self.__QusbName, "pathto", "/mnt/test/disk2"))
+ self.__check_mounted = varlist.get('check_mounted', self.__xmlObj.getKeyVal(self.__QusbName, "check_mounted", ""))
+
+ self.__resultlist = []
+
+ def removefileIfExist(self, fname):
+ if os.path.exists(fname):
+ try:
+ os.remove(fname)
+ except OSError as error:
+ return False, str(error)
+ return True, ''
+
+ def checkfiles(self):
+ if not os.path.isfile('{}/{}'.format(self.__filepath_from, self.__file_source)):
+ return False, 'File test binary Not found {}/{}'.format(self.__filepath_from, self.__file_source)
+ if not os.path.isfile('{}/{}'.format(self.__filepath_from, self.__file_md5)):
+ return False, 'File test binary md5 Not found {}/{}'.format(self.__filepath_from, self.__file_md5)
+ if self.__check_mounted != '':
+ if not os.path.ismount('{}'.format(self.__check_mounted)):
+ return False, 'Required mounted failed: {}'.format(self.__path_to)
+ r, s = self.removefileIfExist('{}/{}'.format(self.__path_to, self.__file_source))
+ if not r:
+ return r, s
+ return True, ''
+
+ def getTestFile_md5(self, fname):
+ t = ''
+ with open('{}'.format(fname), 'r') as outfile:
+ t = outfile.read().rstrip("\n")
+ outfile.close()
+ return t
+
+ def ExecuteTranfer(self):
+ try:
+ originalMD5 = self.getTestFile_md5('{}/{}'.format(self.__filepath_from, self.__file_md5))
+ if originalMD5 == '':
+ self.fail('MD5 file {}/{} Not contain any md5'.format(self.__filepath_from, self.__file_md5))
+ # shutil.copy2('{}/{}'.format(self.__filepath_from, self.__file_source), '{}'.format(self.__path_to))
+ sh.cp('{}/{}'.format(self.__filepath_from, self.__file_source), '{}'.format(self.__path_to))
+ sh.sync()
+ md5val = md5_file('{}/{}'.format(self.__path_to, self.__file_source))
+ self.removefileIfExist('{}/{}'.format(self.__path_to, self.__file_source))
+ if originalMD5 != md5val:
+ return False, 'md5 check failed {}<>{}'.format(originalMD5, md5val)
+ except OSError as why:
+ return False,'Error: {} tranfer failed'.format(why.errno)
+ except sh.ErrorReturnCode as shError:
+ return False, 'USB Exception: tranfer failed'
+ except Exception as details:
+ return False, 'USB Exception: {} tranfer failed'.format(details)
+ return True, ''
def execute(self):
- str_cmd= "lsblk -o LABEL"
- lsblk_command = SysCommand("lsblk", str_cmd)
- if lsblk_command.execute() == 0:
- self.__raw_out = lsblk_command.getOutput()
- if self.__raw_out == "":
- return -1
- lines = lsblk_command.getOutput().splitlines()
- host_list=[]
- for i in range(len(lines)):
- if str(lines[i].decode('ascii'))==self.__devLabel:
- host_list.append(i)
- if len(host_list)==int(self.__numPorts):
- str_cmd = "lsblk -o MOUNTPOINT"
- lsblk_command = SysCommand("lsblk", str_cmd)
- if lsblk_command.execute() == 0:
- self.__raw_out = lsblk_command.getOutput()
- if self.__raw_out == "":
- print("failed: no command output")
- self.fail("failed: no command output")
- else:
- lines = lsblk_command.getOutput().splitlines()
- for i in range(len(host_list)):
- file_path=str(lines[host_list[i]].decode('ascii')) + self.__usbFileName
- usb_file = open(file_path, 'r')
- read=usb_file.read()
- if read.find(self.__usbtext)!=-1:
- print(file_path + " --> OK!")
- else:
- self.fail("failed: could not read from usb {}".format(lines[host_list[i]].decode('ascii')))
- self.__numUsbFail.append(host_list[i])
- usb_file.close()
- else:
- self.fail("failed: couldn't execute lsblk command")
-
- else:
- self.fail("failed: reference and real usb host devices number mismatch")
- else:
- self.fail("failed: couldn't execute lsblk command")
+ v, m = self.checkfiles()
+ if not v:
+ self.fail(m)
+ countLoops = int(self.__loops)
+ while countLoops > 0:
+ res, msg = self.ExecuteTranfer()
+ if not res:
+ res, msg = self.ExecuteTranfer()
+ if not res:
+ self.fail(msg)
+ countLoops = countLoops - 1
+
+ def getresults(self):
+ return self.__resultlist
+
+ def gettextresult(self):
+ return ""
diff --git a/test-cli/test/tests/qvideo.py b/test-cli/test/tests/qvideo.py
new file mode 100644
index 0000000..225940b
--- /dev/null
+++ b/test-cli/test/tests/qvideo.py
@@ -0,0 +1,125 @@
+import unittest
+import sh
+import time
+from test.helpers.camara import Camara
+from test.helpers.detect import Detect_Color
+from test.helpers.sdl import SDL2_Test
+
+class Qvideo(unittest.TestCase):
+ params = None
+ __resultlist = None # resultlist is a python list of python dictionaries
+
+ def __init__(self, testname, testfunc, varlist):
+ self.params = varlist
+ super(Qvideo, self).__init__(testfunc)
+ self._testMethodDoc = testname
+ self.__xmlObj = varlist["xml"]
+ self.__QVideoName = varlist.get('name', 'qvideo')
+ self.__resultlist = []
+ self.__w = int(varlist.get('capture_size_w', self.__xmlObj.getKeyVal(self.__QVideoName, "capture_size_w", 1280)))
+ self.__h = int(varlist.get('capture_size_h', self.__xmlObj.getKeyVal(self.__QVideoName, "capture_size_h", 720)))
+ self.__discard_frames_Count = int(varlist.get('capture_discardframes',self.__xmlObj.getKeyVal(self.__QVideoName, "capture_discardframes", 3)))
+ self.__frame_mean = int(varlist.get('capture_framemean', self.__xmlObj.getKeyVal(self.__QVideoName, "capture_framemean", 3)))
+ self.__max_failed = int(varlist.get('capture_maxfailed', self.__xmlObj.getKeyVal(self.__QVideoName, "capture_maxfailed", 1)))
+ self.__cam_setupscript = varlist.get('cam_setupfile', self.__xmlObj.getKeyVal(self.__QVideoName, "cam_setupfile",
+ "/root/hwtest-files/board/scripts/v4l-cam.sh"))
+ self.__sdl_display = varlist.get('sdl_display', self.__xmlObj.getKeyVal(self.__QVideoName, "sdl_display", ":0"))
+ self.__sdl_driver = varlist.get('sdl_driver', self.__xmlObj.getKeyVal(self.__QVideoName, "sdl_driver", "x11"))
+ self.__camdevice = varlist.get('camdevice', self.__xmlObj.getKeyVal(self.__QVideoName, "camdevice", "video0"))
+
+ self.__Camara = Camara(setup_script_path=self.__cam_setupscript, device=self.__camdevice, width=self.__w, height=self.__h)
+ self.__SDL2_Test = SDL2_Test(driver=self.__sdl_driver, display=self.__sdl_display, w=self.__w, h=self.__h)
+ self.__SDL2_Test.Clear()
+
+ def __drop_frames(self, frame_count):
+ count = frame_count
+ while count > 0:
+ _ = self.__Camara.getFrame()
+ count -= 1
+
+ def Verify_Color(self, color):
+ self.paintColor(color)
+ count = self.__frame_mean
+ res_ok = 0
+ res_failed = 0
+ r_count = 0
+ c_mean = 0
+ while count > 0:
+ res, t = self.Capture(color)
+ if res:
+ res_ok += 1
+ else:
+ res_failed += 1
+ if t == "count":
+ r_count += 1
+ else:
+ c_mean +=1
+ count -= 1
+ self.unpaintColor(color)
+ if res_failed > self.__max_failed:
+ if r_count > 0:
+ return '{}_COLOR_FAILED'.format(color)
+ elif c_mean > 0:
+ return 'MEAN_{}_FAILED'.format(color)
+ return "ok"
+
+ def paintColor(self, color):
+ self.__SDL2_Test.Paint(color)
+ time.sleep(3)
+ self.__drop_frames(self.__Camara.getFrameCount())
+
+ def unpaintColor(self, color):
+ self.__SDL2_Test.Clear()
+ time.sleep(1)
+
+ def Capture(self, color):
+ image = self.__Camara.getFrame()
+ if image is None:
+ return None
+ dtObject = Detect_Color(image)
+ if color == 'RED':
+ _, _, mean, count = dtObject.getRed()
+ elif color == 'BLUE':
+ _, _, mean, count = dtObject.getBlue()
+ elif color == 'GREEN':
+ c_mask, croped, mean, count = dtObject.getGreen()
+
+ return self.checkThreshold(color, mean, count)
+
+ def checkThreshold(self, color, mean, count):
+ min_count = int(
+ self.params.get('{}_min_count'.format(color),
+ self.__xmlObj.getKeyVal(self.__QVideoName, '{}_min_count'.format(color), 50000)))
+
+ str = ""
+ # first verify count
+ if count >= min_count:
+ return True, ""
+ else:
+ str = "count"
+ return False, str
+
+ def execute(self):
+ self.__resultlist = []
+
+ # Open camara
+ if not self.__Camara.Open():
+ self.fail('Error: USB camera not found')
+ # discard initial frames
+ self.__drop_frames(self.__discard_frames_Count)
+ # Test
+ res = self.Verify_Color('RED')
+ if res != "ok":
+ self.fail("failed: RED verification failed")
+ res = self.Verify_Color('BLUE')
+ if res != "ok":
+ self.fail("failed: BLUE verification failed")
+ res = self.Verify_Color('GREEN')
+ if res != "ok":
+ self.fail("failed: GREEN verification failed")
+
+ def getresults(self):
+ return self.__resultlist
+
+ def gettextresult(self):
+ return ""
diff --git a/test-cli/test/tests/qwifi.py b/test-cli/test/tests/qwifi.py
index d08149b..4695041 100644
--- a/test-cli/test/tests/qwifi.py
+++ b/test-cli/test/tests/qwifi.py
@@ -1,45 +1,99 @@
-from test.helpers.syscmd import SysCommand
import unittest
-import subprocess
+import time
+import uuid
+import iperf3
+import netifaces
+from test.helpers.iw import iwScan
+from test.helpers.utils import save_json_to_disk
+from test.helpers.utils import station2Port
class Qwifi(unittest.TestCase):
+ __serverip = None
+ __numbytestx = None
+ __bwexpected = None
+ __port = None
+ params = None
+ __bwreal = None
+ __resultlist = None # resultlist is a python list of python dictionaries
+ timebetweenattempts = 5
- def __init__(self, testname, testfunc, signal):
+ # varlist content: serverip, bwexpected, port
+ def __init__(self, testname, testfunc, varlist):
+ self.params = varlist
super(Qwifi, self).__init__(testfunc)
- self.__signal = signal
self._testMethodDoc = testname
- # WiFi SERVERIP fixed at the moment.
- self._serverip = "192.168.5.1"
+ self.__xmlObj = varlist["xml"]
+ self.__QwifiName = varlist.get('name', 'qwifi')
+ self.__loops = varlist.get('loops', self.__xmlObj.getKeyVal(self.__QwifiName, "loops", "1"))
+ self.__port = varlist.get('port', self.__xmlObj.getKeyVal(self.__QwifiName, "port", "5000"))
+ self.__bw = varlist.get('bwexpected', self.__xmlObj.getKeyVal(self.__QwifiName, "bwexpected", "5.0"))
+ self.__serverip = varlist.get('serverip', self.__xmlObj.getKeyVal(self.__QwifiName, "serverip", "localhost"))
+ self.__duration = varlist.get('duration', self.__xmlObj.getKeyVal(self.__QwifiName, "duration", "10"))
+ self.__interface = varlist.get('interface', self.__xmlObj.getKeyVal(self.__QwifiName, "interface", "wlan0"))
+ self.__toPath = varlist.get('to', self.__xmlObj.getKeyVal(self.__QwifiName, "to", "/mnt/station_ramdisk"))
+ self.__retriesCount = varlist.get('retries', self.__xmlObj.getKeyVal(self.__QwifiName, "retries", "5"))
+ self.__waitRetryTime = varlist.get('wait_retry', self.__xmlObj.getKeyVal(self.__QwifiName, "wait_retry", "10"))
+ self.__wifi_res_file = varlist.get('wifi_res_file', self.__xmlObj.getKeyVal(self.__QwifiName, "wifi_res_file", "wifi_test_{}.json"))
+ self.__wifi_st_file = varlist.get('wifi_st_file', self.__xmlObj.getKeyVal(self.__QwifiName, "wifi_st_file", "wifi_st_{}.json"))
+ self.__file_uuid = uuid.uuid4()
+ self.__wifi_res_file = self.__wifi_res_file.format(self.__file_uuid)
+ self.__wifi_st_file = self.__wifi_st_file.format(self.__file_uuid)
+ self.__resultlist = []
+
+ def checkInterface(self, reqInterface):
+ ifaces = netifaces.interfaces()
+ for t in ifaces:
+ if t == reqInterface:
+ return True
+ return False
def execute(self):
- # First check connection with the wifi server using ping command
- #str_cmd = "ping -c 1 {} > /dev/null".format(self._serverip)
- #wifi_ping = SysCommand("wifi_ping", str_cmd)
- #wifi_ping.execute()
- #res = subprocess.call(['ping', '-c', '1', self._serverip])
- p = subprocess.Popen(['ping','-c','1',self._serverip,'-W','2'],stdout=subprocess.PIPE)
- p.wait()
- res=p.poll()
- if res == 0:
- str_cmd= "iw wlan0 link"
- wifi_stats = SysCommand("wifi_stats", str_cmd)
- if wifi_stats.execute() == 0:
- w_stats = wifi_stats.getOutput().decode('ascii').splitlines()
- #self._longMessage = str(self.__raw_out).replace("'", "")
- if not w_stats[0] == "Not connected.":
- signal_st = w_stats[5].split(" ")[1]
- try:
- int(signal_st)
- if -1*int(signal_st) > int(self.__signal):
- self.fail("failed: signal to server lower than expected")
- except ValueError:
- self.fail("failed: error output (Bad connection?)")
+ self.__resultlist = []
+ stationList = {}
+ # check interfaces
+ if not self.checkInterface(self.__interface):
+ self.fail('Interface not found: {}'.format(self.__interface))
+ # scan networks
+ scanner = iwScan(self.__interface)
+ if scanner.scan():
+ stationList = scanner.getStations()
+ # check if we are connected
+ if not scanner.findConnected():
+ self.fail('Not connected to test AP')
+ else:
+ strError = scanner.getLastError()
+ self.fail('qWifi scanner Execution error: {}'.format(strError))
+ # Start Iperf
+ client = iperf3.Client()
+ client.duration = int(self.__duration)
+ client.server_hostname = self.__serverip
+ client.port = station2Port(self.__port)
+ count = int(self.__retriesCount)
+ while count > 0:
+ result = client.run()
+ if result.error == None:
+ if result.received_Mbps >= float(self.__bw) and result.sent_Mbps >= float(self.__bw):
+ save_json_to_disk(filePath='{}/{}'.format(self.__toPath, self.__wifi_st_file),
+ description='wifi scan',
+ mime='application/json',
+ json_data=stationList,
+ result=self.__resultlist)
+ save_json_to_disk(filePath='{}/{}'.format(self.__toPath, self.__wifi_res_file),
+ description='wifi {} iperf3'.format(self.__interface),
+ mime='application/json',
+ json_data=result.json,
+ result=self.__resultlist)
+ return
else:
- self.fail("failed: error output (Bad connection?)")
- #tx_brate = float(w_stats[6].split(" ")[2])
+ self.fail('bw fail: rx:{} tx:{}'.format(result.received_Mbps, result.sent_Mbps))
else:
- self.fail("failed: couldn't execute iw command")
- else:
- self.fail("failed: ping to server {}".format(self._serverip))
+ count = count - 1
+ time.sleep(int(self.__waitRetryTime))
+
+ self.fail('qWifi (max tries) Execution error: {}'.format(result.error))
+ def getresults(self):
+ return self.__resultlist
+ def gettextresult(self):
+ return ""
diff --git a/test-cli/test_main.py b/test-cli/test_main.py
index 3c4d1cb..1f5d1e4 100644
--- a/test-cli/test_main.py
+++ b/test-cli/test_main.py
@@ -1,118 +1,431 @@
-from test.helpers.get_dieid import genDieid
-from subprocess import call
-import xml.etree.ElementTree as XMLParser
-import errno
-import sys
+import time
+import sh
import os
import unittest
+import socket
+import logging
+import sys
+import getopt
+import random
+
+from datetime import datetime
+from test.helpers.int_registers import get_die_id
+from test.helpers.int_registers import get_internal_mac
+from subprocess import call
from test.helpers.testsrv_db import TestSrv_Database
+from test.helpers.setup_xml import XMLSetup
from test.runners.simple import SimpleTestRunner
-from test.tests.qbutton import Qbutton
-from test.helpers.syscmd import TestSysCommand
-from test.helpers.syscmd import SysCommand
-from test.tests.qiperf import QIperf
from test.tests.qethernet import Qethernet
-from test.tests.qaudio import Qaudio
from test.tests.qram import Qram
from test.tests.qusb import Qusb
-from test.tests.qi2c import Qi2c
from test.tests.qeeprom import Qeeprom
from test.tests.qserial import Qserial
-from test.tests.qscreen import Qscreen
from test.tests.qwifi import Qwifi
from test.tests.qrtc import Qrtc
from test.tests.qduplex_ser import Qduplex
-from test.tests.qamp import Qamp
-from test.tests.qflash import Qflasher
-from test.helpers.finisher import Finisher
+from test.tests.qamper import Qamper
+from test.tests.qnand import Qnand
+from test.tests.qaudio import Qaudio
+from test.tests.qdmesg import Qdmesg
+from test.tests.qmmcflash import Qmmcflash
+from test.tests.qplc import Qplc
+from test.tests.qvideo import Qvideo
from test.helpers.globalVariables import globalVar
+from test.tasks.flasheeprom import EEprom_Flasher
+from test.tasks.flashmemory import NAND_Flasher
+from test.helpers.qrreader import QRReader
+from test.helpers.cmdline import LinuxKernelCmd
+from test.helpers.amper import Amper
+from test.enums.StationStates import StationStates
+import xml.etree.ElementTree as XMLParser
+from test.helpers.iseelogger import logObj
+from test.helpers.utils import str2bool
+
+# global variables
+psdbObj = TestSrv_Database()
+xmlObj = None
+test_abspath = None
+tests_manually_executed = []
+AutoTest = False
+OverrideAutoReboot = False
+delay_reboot = '0'
+
# define clear function
def clear():
# check and make call for specific operating system
- _ = call('clear' if os.name =='posix' else 'cls')
+ _ = call('clear' if os.name == 'posix' else 'cls')
+def reboot_board():
+ logObj.getlogger().debug("#reboot board#")
+ if not OverrideAutoReboot:
+ set_next_state(StationStates.STATION_WAIT_SHUTDOWN.name)
+ if delay_reboot == '0':
+ time.sleep(5)
+ sh.shutdown('-r', '+{}'.format(delay_reboot), _bg=True)
+ # sh.reboot(_bg=True)
+ print("REBOOT...")
+ exit(0)
-def create_board():
- psdb = TestSrv_Database()
- psdb.open("setup.xml")
- tree = XMLParser.parse('setup.xml')
- root = tree.getroot()
- suite = unittest.TestSuite()
- for element in root.iter('board'):
- # print(str(element.tag) + str(element.attrib))
- model_id = element.attrib['model']
- variant = element.attrib['variant']
- nstation = element.attrib['station']
- globalVar.g_mid=model_id + "-" + variant
- globalVar.station=nstation
- processor_id=genDieid(globalVar.g_mid)
- print(globalVar.g_mid)
- print(processor_id)
- globalVar.g_uuid = psdb.create_board(processor_id, model_id, variant, bmac = None)
-
-def testsuite():
- psdb=TestSrv_Database()
- psdb.open("setup.xml")
- suite = unittest.TestSuite()
- tests=psdb.getboard_comp_test_list(globalVar.g_uuid)
- for i in range(len(tests)):
- #newstr = oldstr.replace("M", "")
- variables=str(tests[i][0]).split(",")
- testname=variables[0].replace('(', '')
- testdes=variables[1]
- testfunc=variables[2]
- if len(tests)>2:
- testparam=variables[3].replace(')', '')
- testparam = testparam.replace('"', '')
- testparam = testparam.replace(';', "','")
- if testparam == "":
- command = "suite.addTest({}('{}','execute'))".format(testfunc, testname)
+def wait_for_reboot():
+ while True:
+ currentstate = psdbObj.read_station_state(globalVar.station)
+ if currentstate == StationStates.STATION_REBOOT.name:
+ reboot_board()
+ time.sleep(5)
+
+def execute_station_error(text):
+ logObj.getlogger().error("Station error: #{}#".format(text))
+ wait_for_reboot()
+
+def check_first_state():
+ currentstate = psdbObj.read_station_state(globalVar.station)
+ if currentstate == StationStates.STATION_ERROR.name or currentstate == StationStates.STATION_REBOOT.name:
+ logObj.getlogger().debug('station state during the first check is {}'.format(currentstate))
+ wait_for_reboot()
+ else:
+ count_t = int(xmlObj.getKeyVal("test_main", "wait_test_start", "240"))
+ while currentstate != StationStates.WAIT_TEST_START.name:
+ logObj.getlogger().debug('wait for WAIT_TEST_START but read: {}'.format(currentstate))
+ if count_t <= 0:
+ # Error
+ execute_station_error("Timeout while waiting for WAIT_TEST_START state actual STATE = {}".format(currentstate))
else:
- command="suite.addTest({}('{}','execute','{}'))".format(testfunc,testname,testparam)
+ count_t = count_t - 5
+ time.sleep(5)
+ currentstate = psdbObj.read_station_state(globalVar.station)
+ if currentstate == StationStates.STATION_ERROR.name or currentstate == StationStates.STATION_REBOOT.name:
+ wait_for_reboot()
+
+
+def set_next_state(newstate):
+ statewritten = psdbObj.change_station_state(globalVar.station, newstate)
+ if statewritten == StationStates.STATION_REBOOT.name or statewritten == StationStates.STATION_ERROR.name:
+ wait_for_reboot()
+
+def reboot_if_autotest():
+ # reset board if AUTOTEST is enabled
+ if AutoTest:
+ reboot_board()
+
+
+def create_paramslist(params):
+ paramlist = {}
+ for varname, varvalue in params:
+ paramlist[varname] = varvalue
+ return paramlist
+
+
+def add_test(suite, testdefname, paramlist):
+ if testdefname == "RAM":
+ suite.addTest(Qram(testdefname, "execute", paramlist))
+ return 0
+ elif testdefname == "SERIALDUAL":
+ suite.addTest(Qduplex(testdefname, "execute", paramlist))
+ return 0
+ elif testdefname == "EEPROM":
+ suite.addTest(Qeeprom(testdefname, "execute", paramlist))
+ return 0
+ elif testdefname == "SERIAL":
+ suite.addTest(Qserial(testdefname, "execute", paramlist))
+ return 0
+ elif testdefname == "RTC":
+ suite.addTest(Qrtc(testdefname, "execute", paramlist))
+ return 0
+ elif testdefname == "CONSUMPTION":
+ suite.addTest(Qamper(testdefname, "execute", paramlist))
+ return 0
+ elif testdefname == "ETHERNET":
+ suite.addTest(Qethernet(testdefname, "execute", paramlist))
+ return 0
+ elif testdefname == "NAND":
+ suite.addTest(Qnand(testdefname, "execute", paramlist))
+ return 0
+ elif testdefname == "WIFI":
+ suite.addTest(Qwifi(testdefname, "execute", paramlist))
+ return 0
+ elif testdefname == "USB":
+ suite.addTest(Qusb(testdefname, "execute", paramlist))
+ return 0
+ elif testdefname == "AUDIO":
+ suite.addTest(Qaudio(testdefname, "execute", paramlist))
+ return 0
+ elif testdefname == "DMESG":
+ tests_manually_executed.append(Qdmesg(testdefname, "execute", paramlist))
+ return 0
+ elif testdefname == "MMCFLASH":
+ suite.addTest(Qmmcflash(testdefname, "execute", paramlist))
+ return 0
+ elif testdefname == "PLC_MODEM":
+ suite.addTest(Qplc(testdefname, "execute", paramlist))
+ return 0
+ elif testdefname == "VIDEO":
+ suite.addTest(Qvideo(testdefname, "execute", paramlist))
+ return 0
+ else:
+ logObj.getlogger().error("Params for test: {} not implemented (ignore it), ".format(testdefname))
+ return 1
+
+
+def create_testsuite():
+ # create an object TestSuite
+ suite1 = unittest.TestSuite()
+ # get list of tests for this board
+ tests = psdbObj.get_tests_list(globalVar.g_uuid)
+ logObj.getlogger().debug("Create TestSuite: {}".format(tests))
+ # loop in every test for this board
+ for testid, testdefname in tests:
+ # get params for this test
+ params = psdbObj.get_test_params_list(testid)
+ paramlist = create_paramslist(params)
+ # add the testid as parameter
+ paramlist["testid"] = testid
+ paramlist["boarduuid"] = globalVar.g_uuid
+ paramlist["testidctl"] = globalVar.testid_ctl
+ paramlist['testPath'] = test_abspath
+ logObj.getlogger().debug("Params for test: {}:{}-{}, ".format(testid, testdefname, paramlist))
+ paramlist["xml"] = xmlObj
+ paramlist["db"] = psdbObj
+ # add test to TestSuite
+ add_test(suite1, testdefname, paramlist)
+
+ return suite1
+
+
+def create_board():
+
+ cmd = LinuxKernelCmd()
+ model_id = cmd.getkvar("bmodel", "none")
+ variant = cmd.getkvar("bvariant", "none")
+ logObj.getlogger().debug("Kernel Command line vars: bmodel: {} bvariant: {}".format(model_id, variant))
+
+ if model_id == "none" or variant == "none":
+ # Error
+ execute_station_error("Cannot get model {} and variant {} information".format(model_id, variant))
+
+ # get model id
+ globalVar.g_mid = model_id + "-" + variant
+ logObj.getlogger().debug("Model-Variant -> {}".format(globalVar.g_mid))
+
+ # get station
+ if "Station" not in globalVar.station:
+ # Error
+ execute_station_error("Hostname not defined")
+
+ processor_id = get_die_id(globalVar.g_mid)
+ globalVar.g_uuid = psdbObj.create_board(processor_id, model_id, variant, globalVar.station,
+ get_internal_mac(globalVar.g_mid))
+
+ logObj.getlogger().info("processor ID: {} uuid: {}".format(processor_id, globalVar.g_uuid))
+
+
+def get_taskvars_list(uuid):
+ varlist = {}
+ for varname, varvalue in psdbObj.get_task_variables_list(uuid):
+ varlist[varname] = varvalue
+ return varlist
+
+def execute_extra_task():
+ # list of task
+ myTask = []
+ # Change state to "EXTRATASKS_RUNNING"
+ set_next_state(StationStates.EXTRATASKS_RUNNING.name)
+ # create task control
+ globalVar.taskid_ctl = psdbObj.open_task(globalVar.g_uuid)
+ # get extra variables
+ varlist = get_taskvars_list(globalVar.g_uuid)
+
+ # Put default Vars
+ varlist["boarduuid"] = globalVar.g_uuid
+ varlist["testidctl"] = globalVar.testid_ctl
+ varlist['testPath'] = test_abspath
+ varlist["xml"] = xmlObj
+ varlist["db"] = psdbObj
+
+ # Get list Extra Task and activation
+ eeprom = varlist.get('eeprom', '0')
+ flashNand = varlist.get('flashNAND', '0')
+
+ if eeprom == '1':
+ myEEprom = EEprom_Flasher(varlist)
+ myTask.append(myEEprom)
+ if flashNand == '1':
+ myNAND = NAND_Flasher(varlist)
+ myTask.append(myNAND)
+
+ taskresult = True
+ for task in myTask:
+ res, data = task.Execute()
+ if res:
+ psdbObj.create_task_result(globalVar.taskid_ctl, task.getTaskName(), "TASK_OK", data)
+ logObj.getlogger().info("task: {} ok".format(task.getTaskName()))
else:
- print(testname)
- command = "suite.addTest({}('{}','execute'))".format(testfunc, testname)
- exec(command)
- globalVar.testid_ctl=psdb.open_testbatch(globalVar.g_uuid)
- return suite
-
-
-def finish_test():
- psdb = TestSrv_Database()
- psdb.open("setup.xml")
- auxs = psdb.close_testbatch(globalVar.g_uuid, globalVar.testid_ctl)
- globalVar.fstatus = auxs[0][0]
- # Burn eeprom struct
- psdb = TestSrv_Database()
- psdb.open("setup.xml")
- # We should call getboard_eeprom only if test was ok
- if globalVar.fstatus:
- aux = psdb.getboard_eeprom(globalVar.g_uuid)
- finish = Finisher(aux)
- finish.end_ok()
+ psdbObj.create_task_result(globalVar.taskid_ctl, task.getTaskName(), "TASK_FAIL", data)
+ logObj.getlogger().info("task: {} failed".format(task.getTaskName()))
+ taskresult = False
+
+ # check final taskresult => ok or fail
+ if taskresult:
+ psdbObj.update_taskctl_status(globalVar.taskid_ctl, "TASK_BOARD_OK")
else:
- finish = Finisher(globalVar.g_uuid)
- finish.end_fail()
- # Update set_test current_test with 'END' so that it finally gets painted in green
- psdb = TestSrv_Database()
- psdb.open("setup.xml")
- psdb.update_set_test_row(globalVar.station, globalVar.testid_ctl, globalVar.g_uuid, "END","FINISH")
-
-def main():
- #addtesttomodel()
- #addtestdef()
+ psdbObj.update_taskctl_status(globalVar.taskid_ctl, "TASK_BOARD_FAIL")
+ for task in myTask:
+ if task.getError() != '':
+ execute_station_error("Unable to complete extra tasks: {}".format(task.getError()))
+ # Not emit a error message
+ execute_station_error("Unable to complete extra tasks")
+
+def getBarCode(fake):
+
+ set_next_state(StationStates.WAITING_FOR_SCANNER.name)
+ if fake:
+ psdbObj.set_factorycode(globalVar.g_uuid, str(random.randint(1, 1000001)))
+ return
+ qrreceived = False
+ wait_count = psdbObj.get_setup_variable("test_main", xmlObj.getKeyVal("test_main", "qr_wait_time", "120"))
+ while not qrreceived or int(wait_count) > 0:
+ qr = QRReader()
+ if qr.openQR():
+ # waits 5s to receive a valid code
+ if qr.readQRasync(10):
+ qrreceived = True
+ factorycode = qr.getQRNumber()
+ psdbObj.set_factorycode(globalVar.g_uuid, factorycode)
+ return
+ qr.closeQR()
+ else:
+ qrreceived = True
+ del qr
+ time.sleep(1)
+ wait_count = wait_count - 1
+ if int(wait_count) == 0:
+ execute_station_error("Cannot find a barcode scanner")
+
+
+def execute_test ():
+ # initialize the board
create_board()
- #globalVar.g_uuid = "1f59c654-0cc6-11e8-8d51-e644f56b8edd"
- try:
- os.remove("test_results.dat")
- except:
- pass
- runner = SimpleTestRunner()
- runner.run(testsuite())
- finish_test()
+ # create a process
+ globalVar.testid_ctl = psdbObj.open_test(globalVar.g_uuid)
+ logObj.getlogger().info("Create new Test: {}".format(globalVar.testid_ctl))
+
+ # Change state to "TESTS_RUNNING"
+ set_next_state(StationStates.TESTS_RUNNING.name)
+
+ # generate suits
+ suite1 = create_testsuite()
+ # Execute tests (round 1)
+ runner1 = SimpleTestRunner(psdbObj)
+ logObj.getlogger().info("Runner run id: {}".format(globalVar.testid_ctl))
+ testresult = runner1.run(suite1)
+ logObj.getlogger().info("Runner run id: {} finish {}".format(globalVar.testid_ctl, testresult))
+
+ # Execute manually tests
+ for test in tests_manually_executed:
+ logObj.getlogger().info("Execute manual test id: {}:{}".format(globalVar.testid_ctl, test.getTestName()))
+ mtestResult = test.execute()
+ logObj.getlogger().info("Execute manual test id: {}:{} Result: {}".format(globalVar.testid_ctl, test.getTestName(),
+ 'ok' if mtestResult else 'fail'))
+
+ # execute aditional tasks, only if the test was successful
+ if testresult.wasSuccessful():
+ logObj.getlogger().info("Execute test id: {} finished succesfully".format(globalVar.testid_ctl))
+ set_next_state(StationStates.TESTS_OK.name)
+ logObj.getlogger().info("Execute test id: {} extra task".format(globalVar.testid_ctl))
+ execute_extra_task()
+ logObj.getlogger().info("Execute test id: {} wait read scanner".format(globalVar.testid_ctl))
+ getBarCode(AutoTest)
+ set_next_state(StationStates.FINISHED.name)
+ logObj.getlogger().info("Execute test id: {} wait read scanner finish".format(globalVar.testid_ctl))
+ else:
+ logObj.getlogger().info("Execute test id: {} finished with one or more fails".format(globalVar.testid_ctl))
+ # Change state to "TESTS_FAILED"
+ set_next_state(StationStates.TESTS_FAILED.name)
+
+ # reset board if AUTOTEST is enabled
+ reboot_if_autotest()
+
+
+def enableFaulthandler():
+ """ Enable faulthandler for all threads.
+
+ If the faulthandler package is available, this function disables and then
+ re-enables fault handling for all threads (this is necessary to ensure any
+ new threads are handled correctly), and returns True.
+
+ If faulthandler is not available, then returns False.
+ """
+ try:
+ import faulthandler
+ # necessary to disable first or else new threads may not be handled.
+ faulthandler.disable()
+ faulthandler.enable(all_threads=True)
+ return True
+ except ImportError:
+ return False
if __name__ == "__main__":
+ # Clear the shell screen
clear()
- main()
+
+ enableFaulthandler()
+
+ test_abspath = os.path.dirname(os.path.abspath(__file__))
+ configFile = 'setup.xml'
+ loglevel = 'INFO'
+ logObj.getlogger().info("Test program Started : {}".format(datetime.now()))
+ try:
+ argv = sys.argv[1:]
+ opts, args = getopt.getopt(argv, 'c:l:', ['cfg', 'loglevel'])
+ for opt, args in opts:
+ if opt in ('-c', '--setup'):
+ configFile = args
+ if opt in ('-l', '--loglevel'):
+ loglevel = args
+ except getopt.GetoptError as err:
+ logObj.getlogger().error('#{}#'.format(err))
+
+ if loglevel != 'INFO':
+ logObj.setLogLevel(loglevel)
+
+ # Try to parse the setup.xml file
+ logObj.getlogger().debug("parse config file: {}". format(configFile))
+ try:
+ xmlObj = XMLSetup(os.path.join(test_abspath, configFile))
+ except XMLParser.ParseError as error:
+ # Error
+ execute_station_error("Error parsing {} file {}".format(configFile, error.msg))
+
+ # Try to connect to the DB, according to setup.xml configuration
+ if not psdbObj.open(xmlObj):
+ # Error
+ execute_station_error("Cannot open DB connection")
+ else:
+ logObj.getlogger().debug("database connection ok: {}".format(psdbObj.getConfig()))
+
+ # get station name
+ globalVar.station = socket.gethostname()
+ logObj.getlogger().info("Test Station: {}".format(globalVar.station))
+
+ if psdbObj.get_setup_variable("AUTOTEST_" + globalVar.station, xmlObj.getKeyVal("test_main", "autotest", "0")) == '1':
+ AutoTest = True
+
+ OverrideAutoReboot = str2bool(xmlObj.getKeyVal("test_main", "OverrideAutoReboot", "0"))
+ delay_reboot = xmlObj.getKeyVal("test_main", "delayReboot", "0")
+
+ logObj.getlogger().info("AutoTest: {} OverrideAutoReboot: {} delay Reboot: {}".format(AutoTest, OverrideAutoReboot, delay_reboot))
+
+ # Check if current state is "WAIT_TEST_START". if not, waits here
+ if xmlObj.getKeyVal("test_main", "overrideCheckWaitStart", "0") == "0":
+ check_first_state()
+ else:
+ psdbObj.setDevelStationState(globalVar.station, "WAIT_TEST_START")
+
+ # Change state to "TESTS_CHECKING_ENV"
+ set_next_state(StationStates.TESTS_CHECKING_ENV.name)
+
+ # execute main program
+ execute_test()
+
+ exit(0)