summaryrefslogtreecommitdiff
path: root/test-cli/test/helpers
diff options
context:
space:
mode:
authorManel Caro <mcaro@iatec.biz>2021-11-06 16:28:38 +0100
committerManel Caro <mcaro@iatec.biz>2021-11-06 16:28:38 +0100
commitcf19bfe18cbd283b188a858ee1629f9909c924f4 (patch)
tree1efb23519727130058401df090ab1b5f4cc8ba99 /test-cli/test/helpers
parentb6932fbaf898724ae87c29f8965621610f377084 (diff)
parentd5b273a3b58a250742049df4ca0ef0ba54f53d33 (diff)
downloadboard-cf19bfe18cbd283b188a858ee1629f9909c924f4.zip
board-cf19bfe18cbd283b188a858ee1629f9909c924f4.tar.gz
board-cf19bfe18cbd283b188a858ee1629f9909c924f4.tar.bz2
Merge branch 'sopa-test'rel.0.1sopa-test
Diffstat (limited to 'test-cli/test/helpers')
-rw-r--r--test-cli/test/helpers/DiskHelpers.py9
-rw-r--r--test-cli/test/helpers/__init__.pycbin122 -> 132 bytes
-rw-r--r--test-cli/test/helpers/amper.py154
-rw-r--r--test-cli/test/helpers/button_script.sh4
-rw-r--r--test-cli/test/helpers/camara.py116
-rw-r--r--test-cli/test/helpers/changedir.py14
-rw-r--r--test-cli/test/helpers/cmdline.py28
-rw-r--r--test-cli/test/helpers/cv_display_test.py28
-rw-r--r--test-cli/test/helpers/detect.py61
-rw-r--r--test-cli/test/helpers/finisher.py151
-rw-r--r--test-cli/test/helpers/get_dieid.py43
-rw-r--r--test-cli/test/helpers/globalVariables.py3
-rw-r--r--test-cli/test/helpers/gpio.py55
-rw-r--r--test-cli/test/helpers/int_registers.py68
-rw-r--r--test-cli/test/helpers/iseelogger.py62
-rw-r--r--test-cli/test/helpers/iw.py124
-rw-r--r--test-cli/test/helpers/md5.py8
-rw-r--r--test-cli/test/helpers/plc.py82
-rw-r--r--test-cli/test/helpers/psqldb.py44
-rw-r--r--test-cli/test/helpers/qrreader.py123
-rw-r--r--test-cli/test/helpers/sdl.py126
-rw-r--r--test-cli/test/helpers/setup_xml.py50
-rw-r--r--test-cli/test/helpers/syscmd.py72
-rw-r--r--test-cli/test/helpers/testsrv_db.py262
-rw-r--r--test-cli/test/helpers/uboot_flasher.py0
-rw-r--r--test-cli/test/helpers/usb.py270
-rw-r--r--test-cli/test/helpers/utils.py68
27 files changed, 1676 insertions, 349 deletions
diff --git a/test-cli/test/helpers/DiskHelpers.py b/test-cli/test/helpers/DiskHelpers.py
new file mode 100644
index 0000000..79b661e
--- /dev/null
+++ b/test-cli/test/helpers/DiskHelpers.py
@@ -0,0 +1,9 @@
+import stat
+import os
+
+
+def disk_exists(path):
+ try:
+ return stat.S_ISBLK(os.stat(path).st_mode)
+ except:
+ return False
diff --git a/test-cli/test/helpers/__init__.pyc b/test-cli/test/helpers/__init__.pyc
index 7d1c907..01014e2 100644
--- a/test-cli/test/helpers/__init__.pyc
+++ b/test-cli/test/helpers/__init__.pyc
Binary files differ
diff --git a/test-cli/test/helpers/amper.py b/test-cli/test/helpers/amper.py
new file mode 100644
index 0000000..ea719f6
--- /dev/null
+++ b/test-cli/test/helpers/amper.py
@@ -0,0 +1,154 @@
+import serial
+import scanf
+
+
+class Amper(object):
+ __ser = None
+ __port = '/dev/ttyUSB0'
+ __speed = 115200
+ __parity = None
+ __rtscts = 0
+ __timeout = 1
+ __isThere = False
+ __version = None
+ __voltage = 0.0
+ __current = 0.0
+ __alarm_condition = 0
+
+ def __init__(self, port='/dev/ttyUSB0', serial_speed=115200, parity=serial.PARITY_NONE, rtscts=0):
+ self.__port = port
+ self.__speed = serial_speed
+ self.__parity = parity
+ self.__rtscts = rtscts
+
+ def open(self):
+ try:
+ if self.__ser is not None:
+ self.close()
+ self.__ser = serial.Serial(port=self.__port,
+ baudrate=self.__speed,
+ timeout=self.__timeout,
+ parity=self.__parity,
+ rtscts=self.__rtscts,
+ bytesize=serial.EIGHTBITS,
+ stopbits=serial.STOPBITS_ONE,
+ xonxoff=0)
+
+ self.__ser.flushInput()
+ self.__ser.flushOutput()
+ self.__ser.break_condition = False
+ return True
+ except serial.SerialException as err:
+ print(err)
+ return False
+
+ def close(self):
+ if self.__ser is not None:
+ self.__ser.close()
+ self.__ser = None
+ self.__isThere = False
+ self.__version = None
+ self.__voltage = 0.0
+ self.__current = 0.0
+ self.__alarm_condition = 0
+
+ def __write(self, data):
+ data += "\n\r"
+ dat = data.encode('ascii')
+ self.__ser.write(dat)
+
+ def __read(self):
+ return self.__ser.readline().decode().rstrip()
+
+ def __set_voltage_underlimit(self, under_limit):
+ if self.__isThere and self.__ser is not None:
+ self.__write('AT+VIN_UV_W_LIM={}'.format(under_limit))
+ dat = self.__read()
+ print(dat)
+
+ def __get_voltage_underlimit(self):
+ self.__write('AT+VIN_UV_W_LIM=?')
+ dat = self.__read()
+ res = scanf.scanf("%f", dat)
+ return res[0]
+
+ def __set_voltage_overlimit(self, over_limit):
+ if self.__isThere and self.__ser is not None:
+ self.__write('AT+VIN_OV_W_LIM={}'.format(over_limit))
+ dat = self.__read()
+ print(dat)
+
+ def __get_voltage_overlimit(self):
+ if self.__isThere and self.__ser is not None:
+ self.__write('AT+VIN_OV_W_LIM?')
+ dat = self.__read()
+ res = scanf.scanf("%f", dat)
+ return res[0]
+
+ def __set_current_overlimit(self, over_limit):
+ if self.__isThere and self.__ser is not None:
+ self.__write('AT+IOUT_W_LIM={}'.format(over_limit))
+ dat = self.__read()
+ print(dat)
+
+ def __get_current_overlimit(self):
+ if self.__isThere and self.__ser is not None:
+ self.__write('AT+IOUT_W_LIM?')
+ dat = self.__read()
+ dat = self.__read()
+ res = scanf.scanf("%f", dat)
+ return res[0]
+
+ def hello(self):
+ self.__isThere = False
+ if self.__ser is None:
+ return False
+ self.__write('at+ver?')
+ dat = self.__read()
+ res = scanf.scanf("ISEE Amper v%3c", dat)
+ if res is None:
+ return False
+ self.__isThere = True
+ self.__version = res[0]
+ return True
+
+ def getVersion(self):
+ return self.__version
+
+ def getVoltage(self):
+ if self.__isThere and self.__ser is not None:
+ self.__write('at+vin?')
+ dat = self.__read()
+ res = scanf.scanf("%f", dat)
+ self.__voltage = res[0]
+ # print(self.__voltage)
+ return self.__voltage
+ else:
+ return None
+
+ def getCurrent(self):
+ if self.__isThere and self.__ser is not None:
+ self.__write('at+in?')
+ dat = self.__read()
+ res = scanf.scanf("%f", dat)
+ self.__current = res[0]
+ # print(self.__current)
+ else:
+ return None
+ return self.__current
+
+ def getAlarm(self):
+ if self.__isThere and self.__ser is not None:
+ self.__write('at+st_mfr?')
+ dat = self.__read()
+ # print(dat)
+ self.__alarm_condition = (int(dat) & 0x0F)
+ # print(self.__alarm_condition)
+ return self.__alarm_condition
+
+ def clearAlarm(self):
+ if self.__isThere and self.__ser is not None:
+ self.__write('at+CLRFAULT')
+ dat = self.__read()
+ # print(dat)
+
diff --git a/test-cli/test/helpers/button_script.sh b/test-cli/test/helpers/button_script.sh
deleted file mode 100644
index 6908f22..0000000
--- a/test-cli/test/helpers/button_script.sh
+++ /dev/null
@@ -1,4 +0,0 @@
-#!/usr/bin/env bash
-i2cset -f -y 1 0x2d 0x40 0x31
-i2cset -f -y 1 0x2d 0x50 0xff
-i2cget -f -y 1 0x2d 0x50
diff --git a/test-cli/test/helpers/camara.py b/test-cli/test/helpers/camara.py
new file mode 100644
index 0000000..b23df74
--- /dev/null
+++ b/test-cli/test/helpers/camara.py
@@ -0,0 +1,116 @@
+import cv2
+import sh
+
+class Camara(object):
+ __device_name = None
+ __device = None
+ __setupScriptPath = ''
+ __w = 1280
+ __h = 720
+ __contrast = 0.0
+ __brightness = 0.0
+ __saturation = 55.0
+ __hue = 0.0
+ __exposure = 166
+
+ def __init__(self, setup_script_path, device="video0", width=1280, height=720):
+ self.__device_name = device
+ self.__w = width
+ self.__h = height
+ self.__setupScriptPath = setup_script_path
+
+ def Close(self):
+ if self.__device is not None:
+ del self.__device
+ self.__device = None
+
+ def Open(self):
+ self.Close()
+ self.__device = cv2.VideoCapture("/dev/{}".format(self.__device_name))
+ if self.__device.isOpened():
+ self.__configure()
+ return True
+ return False
+
+ def getSize(self):
+ return self.__w, self.__h
+
+ def setSize(self, w, h):
+ if self.__device is not None and self.__device.isOpened():
+ self.__w = self.__setCamVar(cv2.CAP_PROP_FRAME_WIDTH, w)
+ self.__h = self.__setCamVar(cv2.CAP_PROP_FRAME_HEIGHT, h)
+ else:
+ self.__w = w
+ self.__h = h
+
+ def setContrast(self, newVal):
+ if self.__device.isOpened():
+ self.__contrast = self.__setCamVar(cv2.CAP_PROP_CONTRAST, newVal)
+ else:
+ self.__contrast = newVal
+
+ def getContrast(self):
+ return self.__contrast
+
+ def setBrightness(self, newVal):
+ if self.__device.isOpened():
+ self.__brightness = self.__setCamVar(cv2.CAP_PROP_BRIGHTNESS, newVal)
+ else:
+ self.__brightness = newVal
+
+ def getBrightness(self):
+ return self.__brightness
+
+ def __configure(self):
+ self.__w = self.__setCamVar(cv2.CAP_PROP_FRAME_WIDTH, self.__w)
+ self.__h = self.__setCamVar(cv2.CAP_PROP_FRAME_HEIGHT, self.__h)
+ sh.bash(self.__setupScriptPath)
+
+ def __setCamVar(self, key, val):
+ valold = cv2.VideoCapture.get(self.__device, key)
+ if valold != val:
+ cv2.VideoCapture.set(self.__device, key, val)
+ t = cv2.VideoCapture.get(self.__device, key)
+ return t
+ return val
+
+ def __getCamVar(self, key):
+ return cv2.VideoCapture.get(self.__device, key);
+
+ def getFrameCount(self):
+ return cv2.VideoCapture.get(self.__device, cv2.CAP_PROP_BUFFERSIZE);
+
+ def getFrame(self):
+ if self.__device.isOpened():
+ retval, image = self.__device.read()
+ if retval:
+ return image
+ return None
+ else:
+ return None
+
+ def getImageSize(self, image):
+ if hasattr(image, 'shape'):
+ return image.shape[1], image.shape[0]
+ else:
+ return (0, 0, 0)
+
+ def showFrame(self, name, frame, w=False, maxTime=3000):
+ cv2.imshow(name, frame)
+ if w:
+ if maxTime == -1:
+ cv2.waitKey()
+ else:
+ cv2.waitKey(maxTime)
+
+ def saveFrame(self, fileName, Frame):
+ cv2.imwrite(fileName, Frame)
+
+ def readImage(self, filename):
+ return cv2.imread('{}'.format(filename), 0)
+
+ def destroyWindow(self, name):
+ cv2.destroyWindow(name)
+
+ def closeWindows(self):
+ cv2.destroyAllWindows() \ No newline at end of file
diff --git a/test-cli/test/helpers/changedir.py b/test-cli/test/helpers/changedir.py
new file mode 100644
index 0000000..fad9ade
--- /dev/null
+++ b/test-cli/test/helpers/changedir.py
@@ -0,0 +1,14 @@
+import os
+
+
+class changedir:
+ """Context manager for changing the current working directory"""
+ def __init__(self, newPath):
+ self.newPath = os.path.expanduser(newPath)
+
+ def __enter__(self):
+ self.savedPath = os.getcwd()
+ os.chdir(self.newPath)
+
+ def __exit__(self, etype, value, traceback):
+ os.chdir(self.savedPath) \ No newline at end of file
diff --git a/test-cli/test/helpers/cmdline.py b/test-cli/test/helpers/cmdline.py
new file mode 100644
index 0000000..db94a1c
--- /dev/null
+++ b/test-cli/test/helpers/cmdline.py
@@ -0,0 +1,28 @@
+
+class LinuxKernelCmd:
+
+ __kernel_vars = {}
+
+ def __init__(self):
+ self.parse_kernel_cmdline()
+
+ def parse_kernel_cmdline(self):
+ cmdline = open('/proc/cmdline', 'rb').read().decode()
+ cmd_array = cmdline.split()
+ i = 0
+ for f in cmdline.split():
+ pos = f.find('=')
+ if pos == -1:
+ cmd_array[i - 1] = cmd_array[i - 1] + " " + f
+ cmd_array.remove(cmd_array[i])
+ else:
+ i += 1
+ self.__kernel_vars = {}
+ for f in cmd_array:
+ dat = f.split('=')
+ self.__kernel_vars[dat[0]] = dat[1]
+
+ def getkvar(self, vName, vdefault):
+ if vName in self.__kernel_vars:
+ return self.__kernel_vars[vName]
+ return vdefault
diff --git a/test-cli/test/helpers/cv_display_test.py b/test-cli/test/helpers/cv_display_test.py
index b54a698..3a3004f 100644
--- a/test-cli/test/helpers/cv_display_test.py
+++ b/test-cli/test/helpers/cv_display_test.py
@@ -14,8 +14,7 @@ def pattern_detect(cam_device=0):
# RETURN 0 only if the test is ok
msg="0"
# Capture the corresponding camera device [0,1]
- #capture = cv2.VideoCapture(0)
- capture = cv2.VideoCapture("/dev/v4l/by-id/usb-Creative_Technology_Ltd._Live__Cam_Sync_HD_VF0770-video-index0")
+ capture = cv2.VideoCapture(0)
try:
_, image = capture.read()
except:
@@ -64,12 +63,10 @@ def pattern_detect(cam_device=0):
# After testing the gamma factor correction is computed with the green color
# gamma = gamma_factor / blue+red into green part
# gamma factor = 50-100
- #gamma = (100 / (avg_color_rawg[0] + avg_color_rawg[2] + 1))
- gamma=1
+ gamma = (100 / (avg_color_rawg[0] + avg_color_rawg[2] + 1))
# Adjust the image acording to this gamma value
adjusted = adjust_gamma(image, gamma=gamma)
-# adjusted=image
- cv2.imwrite( "/home/root/result_hdmi_img.jpg", adjusted);
+
# Calculate again the average color using the gamma adjusted image
# Crop the gamma adjusted image for wach color section
red_cal = adjusted[y1:y2, xr1:xr2]
@@ -109,19 +106,14 @@ def pattern_detect(cam_device=0):
mask_r = mask0 + mask1
# mask_r = cv2.inRange(hsv, lower_red, upper_red)
# Perform a morphological open to expand
-# kernel = np.ones((5, 5), np.uint8)
-# closing_r = cv2.morphologyEx(mask_r, cv2.MORPH_OPEN, kernel)
-# closing_b = cv2.morphologyEx(mask_b, cv2.MORPH_OPEN, kernel)
-# closing_g = cv2.morphologyEx(mask_g, cv2.MORPH_OPEN, kernel)
+ kernel = np.ones((15, 15), np.uint8)
+ closing_r = cv2.morphologyEx(mask_r, cv2.MORPH_OPEN, kernel)
+ closing_b = cv2.morphologyEx(mask_b, cv2.MORPH_OPEN, kernel)
+ closing_g = cv2.morphologyEx(mask_g, cv2.MORPH_OPEN, kernel)
# Count the number of pixels that are not of the corresponding color (black)
-# count_r = cv2.countNonZero(closing_r)
-# count_b = cv2.countNonZero(closing_b)
-# count_g = cv2.countNonZero(closing_g)
- #-----------
- count_r = cv2.countNonZero(mask_r)
- count_b = cv2.countNonZero(mask_b)
- count_g = cv2.countNonZero(mask_g)
- #-------
+ count_r = cv2.countNonZero(closing_r)
+ count_b = cv2.countNonZero(closing_b)
+ count_g = cv2.countNonZero(closing_g)
if (count_r < 5):
msg = "RED COUNT FAIL"
return msg
diff --git a/test-cli/test/helpers/detect.py b/test-cli/test/helpers/detect.py
new file mode 100644
index 0000000..193dabf
--- /dev/null
+++ b/test-cli/test/helpers/detect.py
@@ -0,0 +1,61 @@
+import cv2
+import numpy as np
+
+
+class Detect_Color(object):
+ oFrame = None
+ img_hsv = None
+ __red_lower1 = [0, 50, 20]
+ __red_upper1 = [5, 255, 255]
+ __red_lower2 = [175, 50, 20]
+ __red_upper2 = [180, 255, 255]
+ __blue_lower = [110, 50, 50]
+ __blue_upper = [130, 255, 255]
+ __green_lower = [36, 25, 25]
+ __green_upper = [86, 255, 255]
+
+ def __init__(self, frame):
+ self.oFrame = frame
+ self.__hist()
+
+ def __hist(self):
+ self.img_hsv = cv2.cvtColor(self.oFrame, cv2.COLOR_BGR2HSV)
+
+ def getRed(self):
+ return self.__detect_two_areas([0, 50, 20], [5, 255, 255], [175, 50, 20], [180, 255, 255])
+
+ def getBlue(self):
+ return self.__detect_one_area([110, 50, 50], [130, 255, 255])
+
+ def getGreen(self):
+ return self.__detect_one_area([36, 25, 25], [86, 255, 255])
+
+ def __detect_one_area(self, lower, upper):
+ a_lower = np.array(lower)
+ a_upper = np.array(upper)
+ c_mask = cv2.inRange(self.img_hsv, a_lower, a_upper)
+ croped = cv2.bitwise_and(self.oFrame, self.oFrame, mask=c_mask)
+ mean_v = cv2.mean(self.oFrame, mask=c_mask)
+ mean = {}
+ mean['R'] = mean_v[2]
+ mean['G'] = mean_v[1]
+ mean['B'] = mean_v[0]
+ count = cv2.countNonZero(c_mask)
+ return c_mask, croped, mean, count
+
+ def __detect_two_areas(self, lower1, upper1, lower2, upper2):
+ a1_lower = np.array(lower1)
+ a1_upper = np.array(upper1)
+ a2_lower = np.array(lower2)
+ a2_upper = np.array(upper2)
+ c_mask1 = cv2.inRange(self.img_hsv, a1_lower, a1_upper)
+ c_mask2 = cv2.inRange(self.img_hsv, a2_lower, a2_upper)
+ c_mask = cv2.bitwise_or(c_mask1, c_mask2)
+ croped = cv2.bitwise_and(self.oFrame, self.oFrame, mask=c_mask)
+ mean_v = cv2.mean(self.oFrame, mask=c_mask)
+ mean = {}
+ mean['R'] = mean_v[2]
+ mean['G'] = mean_v[1]
+ mean['B'] = mean_v[0]
+ count = cv2.countNonZero(c_mask)
+ return c_mask, croped, mean, count
diff --git a/test-cli/test/helpers/finisher.py b/test-cli/test/helpers/finisher.py
deleted file mode 100644
index 73142d9..0000000
--- a/test-cli/test/helpers/finisher.py
+++ /dev/null
@@ -1,151 +0,0 @@
-from test.helpers.syscmd import SysCommand
-from test.helpers.globalVariables import globalVar
-import binascii
-import uuid
-import subprocess
-from PIL import Image, ImageDraw, ImageFont
-import qrcode
-import PIL
-
-class Finisher(object):
- __muhb = None
- __final_to_burn_to_eeprom = None
- __qr_image = None
-
- def __init__(self, muhb = None):
- self.__muhb = muhb
- self.__final_to_burn_to_eeprom = bytearray()
- self.__qr_image = None
- pass
-
- def eeprom_burn(self):
- ## Create binary file
- str_cmd2 = "find /sys/ -iname 'eeprom'"
- eeprom_location = SysCommand("eeprom_location", str_cmd2)
- if eeprom_location.execute() == 0:
- raw_out = eeprom_location.getOutput()
- if raw_out == "":
- self.fail("Unable to get EEPROM location. IS EEPROM CONNECTED?")
- eeprom = raw_out.decode('ascii')
- eeprom = eeprom.strip('\n')
- ## push binary data to eeprom like if working with files
- file2 = open(eeprom, "w+b")
- file2.write(self.__final_to_burn_to_eeprom)
- else:
- self.fail("failed: could not complete find eeprom command")
-
- def generate_qr_stamp(self):
- # Generate QR to put in a stamp
- qr = qrcode.QRCode(
- version=1,
- error_correction=qrcode.constants.ERROR_CORRECT_L,
- box_size=10,
- border=4,
- )
- # Use board_uuid to generate the QR code
- qr.add_data(globalVar.g_uuid)
- qr.make(fit=True)
- # Save QR as image stamp.png
- img = qr.make_image()
- # Store QR as a class atrib
- self.__qr_image = img
- img.save('/home/root/stamp.png')
-
- def print_stamp(self):
- # Print stamp by sending a cmd to the printer
- str_cmd3 = "lpr -o scaling=4 stamp.png".format(self.__display)
- printstamp = SysCommand("printstamp", str_cmd3)
- if printstamp.execute() != 0:
- self.fail("failed: could not print stamp")
-
- def generate_screen(self):
- # Generate green image with board uuid and QR maybe pasted on top of green image
- # Define image size
- W = 1250
- H = 703
- # Create blank rectangle to write on
- image = Image.new('RGB', (W, H), (46, 204, 113, 0))
- draw = ImageDraw.Draw(image)
- message = "TEST OK\n\n" + "Board UUID: "+ globalVar.g_uuid
- font = ImageFont.truetype('/usr/share/fonts/truetype/dejavuu/DejaVuSans.ttf', 40)
- # Calculate the width and height of the text to be drawn, given font size
- w, h = draw.textsize(message, font=font)
- # Write the text to the image, where (x,y) is the top left corner of the text
- draw.text(((W-w)/2,(H-h)/2), message, align='center', font=font)
- #draw.image(self.__qr_image)
- image.save('/home/root/test/files/test_ok.png')
-
- def show_result_screen(self):
- # If test OK show test_OK.png located in /home/root, If test fail show test_fail.png, located in /home/root/test/files/test_KO.png
- if globalVar.fstatus:
- str_cmd4 = "fbi -T 1 --noverbose -d /dev/{} test/files/test_ok.png".format('fb0')
- else:
- str_cmd4 = "fbi -T 1 --noverbose -d /dev/{} test/files/test_ko.png".format('fb0')
-
- display_image = SysCommand("display_image", str_cmd4)
- #print(display_image.execute())
- if display_image.execute() != -1:
- self.fail("failed: could not display the image")
-
-
- def end_ok(self):
- # Burn retrieved igep eeprom struct
- new = self.__muhb[0][0]
-
- # Convert from string to hex
- hnew = new.encode()
- # Magic_id and default crc32 0x6d, 0x6a, 0x6d, 0xe4 with endianess changed so that u-boot loads it correctly
- # IF magic ever changes this magic_id should be changed. At the end always magic_id of test and u-boot should
- # be the same
- magic_id = bytes([0xe4, 0x6d, 0x6a, 0x6d])
- default_igep_crc32 = bytes([0x00, 0x00, 0x00, 0x00])
-
- # Create bytearray
- to_calculate = bytearray()
- # Build the final hex binary for crc32 operation
- to_calculate.extend(magic_id)
- to_calculate.extend(default_igep_crc32)
- to_calculate.extend(hnew)
-
- # Calculate crc32!
- new_crc32 = binascii.crc32(to_calculate)
- hnew_crc32 = new_crc32.to_bytes(4, byteorder="little")
-
- # Recreate final eeprom struct in bytearray
- self.__final_to_burn_to_eeprom = bytearray()
- self.__final_to_burn_to_eeprom.extend(magic_id)
- self.__final_to_burn_to_eeprom.extend(hnew_crc32)
- self.__final_to_burn_to_eeprom.extend(hnew)
- self.eeprom_burn()
-
- # Generate QR stamp
- self.generate_qr_stamp()
- # Send print stamp command
- #self.print_stamp()
- # Generate green image with board uuid and QR maybe pasted on top of green image
- self.generate_screen()
- # Show test_ok.png image in screen
- self.show_result_screen()
-
- def end_fail(self):
- # Burn igep eeprom bug struct
- hnew = self.__muhb.encode()
- default_fail = bytes([0xD0, 0xBA, 0xD0, 0xBA])
- self.__final_burn_to_eeprom = bytearray()
- self.__final_to_burn_to_eeprom.extend(default_fail)
- self.__final_to_burn_to_eeprom.extend(default_fail)
- self.__final_to_burn_to_eeprom.extend(hnew)
- self.__final_to_burn_to_eeprom.extend(default_fail)
- self.__final_to_burn_to_eeprom.extend(default_fail)
- self.__final_to_burn_to_eeprom.extend(default_fail)
- self.__final_to_burn_to_eeprom.extend(default_fail)
- self.__final_to_burn_to_eeprom.extend(default_fail)
- self.__final_to_burn_to_eeprom.extend(default_fail)
- self.__final_to_burn_to_eeprom.extend(default_fail)
- self.__final_to_burn_to_eeprom.extend(default_fail)
- self.__final_to_burn_to_eeprom.extend(default_fail)
- self.__final_to_burn_to_eeprom.extend(default_fail)
- self.__final_to_burn_to_eeprom.extend(default_fail)
- self.eeprom_burn()
- # Show test_ko.png image in screen
- self.show_result_screen() \ No newline at end of file
diff --git a/test-cli/test/helpers/get_dieid.py b/test-cli/test/helpers/get_dieid.py
deleted file mode 100644
index b20f143..0000000
--- a/test-cli/test/helpers/get_dieid.py
+++ /dev/null
@@ -1,43 +0,0 @@
-import mmap
-import os
-import struct
-MAP_MASK = mmap.PAGESIZE - 1
-WORD = 4
-def read(addr):
- """ Read from any location in memory
- Returns the readed value in hexadecimal format
- Keyword arguments:
- - addr: The memory address to be readed.
- """
- fd = os.open("/dev/mem", os.O_RDWR | os.O_SYNC)
- # Map one page
- mm = mmap.mmap(fd, mmap.PAGESIZE, mmap.MAP_SHARED, mmap.PROT_WRITE | mmap.PROT_READ, offset=addr & ~MAP_MASK)
- mm.seek(addr & MAP_MASK)
- retval = struct.unpack('I', mm.read(WORD))
- mm.close()
- os.close(fd)
- return "%08X" % retval[0]
-
-def getRegisters(model):
- if model.find("IGEP0046") == 0:
- registers = [0x021BC420, 0x021BC410]
- elif model.find("IGEP0000") == 0:
- registers = [0x021BC420, 0x021BC410]
- elif model.find("IGEP0034") == 0 or model.find("SOPA0000") == 0:
- registers = [0x44e10630, 0x44e10634, 0x44e10638, 0x44e1063C]
- elif model.find("OMAP3") == 0:
- registers = [0x4830A224, 0x4830A220, 0x4830A21C, 0x4830A218]
- elif model.find("OMAP5") == 0:
- registers = [0x4A002210, 0x4A00220C, 0x4A002208, 0x4A002200]
- return registers
-
-def genDieid(modelid):
- registers=getRegisters(modelid)
- id=""
- for i in range(len(registers)):
- id=id+(read(registers[i]))
- return id
-
-#if __name__ == "__main__":
- #registers = [0x021BC420, 0x021BC410]
- #print(main(registers))
diff --git a/test-cli/test/helpers/globalVariables.py b/test-cli/test/helpers/globalVariables.py
index c4d8358..baaae57 100644
--- a/test-cli/test/helpers/globalVariables.py
+++ b/test-cli/test/helpers/globalVariables.py
@@ -6,5 +6,4 @@ def globalVar():
g_mid = ""
outdata = "NONE"
station = ""
- fstatus = ""
- gdisplay = None \ No newline at end of file
+ taskid_ctl = ""
diff --git a/test-cli/test/helpers/gpio.py b/test-cli/test/helpers/gpio.py
new file mode 100644
index 0000000..f127f2b
--- /dev/null
+++ b/test-cli/test/helpers/gpio.py
@@ -0,0 +1,55 @@
+import os
+
+class gpio (object):
+ gpioNum = None
+
+ def __init__(self, gpioNum, dir, val):
+ self.gpioNum = gpioNum
+ self.__export(gpioNum)
+ self.__set_dir(gpioNum, dir)
+ if dir == 'out':
+ self.__set_value(gpioNum, val)
+
+ def set_val(self, value):
+ self.__set_value( self.gpioNum, value)
+
+
+ def __export(self, gpio_number):
+ if not os.path.isfile("/sys/class/gpio/gpio{}/value".format(gpio_number)):
+ try:
+ f = open("/sys/class/gpio/export", "w", newline="\n")
+ f.write(str(gpio_number))
+ f.close()
+ except IOError:
+ return False, '{}'.format(IOError.errno)
+ return True
+
+ def __unexport(self, gpio_number):
+ if os.path.isfile("/sys/class/gpio/gpio{}/value".format(gpio_number)):
+ try:
+ f = open("/sys/class/gpio/unexport", "w", newline="\n")
+ f.write(str(gpio_number))
+ f.close()
+ except IOError:
+ return False, '{}'.format(IOError.errno)
+ return True
+
+ def __set_dir(self, gpio_number, dir):
+ try:
+ f = open("/sys/class/gpio/gpio{}/direction".format(gpio_number), "r+", newline="\n")
+ val = f.readline()
+ if val != dir:
+ f.write(dir)
+ f.close()
+ except IOError:
+ return False, '{}'.format(IOError.errno)
+ return True
+
+ def __set_value(self, gpio_number, value):
+ try:
+ f = open("/sys/class/gpio/gpio{}/value".format(gpio_number), "w", newline="\n")
+ f.write(str(value))
+ f.close()
+ except IOError:
+ return False, '{}'.format(IOError.errno)
+ return True \ No newline at end of file
diff --git a/test-cli/test/helpers/int_registers.py b/test-cli/test/helpers/int_registers.py
new file mode 100644
index 0000000..133387c
--- /dev/null
+++ b/test-cli/test/helpers/int_registers.py
@@ -0,0 +1,68 @@
+import mmap
+import os
+import struct
+import sh
+
+MAP_MASK = mmap.PAGESIZE - 1
+WORD = 4
+
+
+def read(addr):
+ """ Read from any location in memory
+ Returns the readed value in hexadecimal format
+ Keyword arguments:
+ - addr: The memory address to be readed.
+ """
+ fd = os.open("/dev/mem", os.O_RDWR | os.O_SYNC)
+ # Map one page
+ mm = mmap.mmap(fd, mmap.PAGESIZE, mmap.MAP_SHARED, mmap.PROT_WRITE | mmap.PROT_READ, offset=addr & ~MAP_MASK)
+ mm.seek(addr & MAP_MASK)
+ retval = struct.unpack('I', mm.read(WORD))
+ mm.close()
+ os.close(fd)
+ return "%08X" % retval[0]
+
+def imx8m_readid():
+ f = open("/sys/bus/soc/devices/soc0/soc_uid", "r", newline="\n")
+ val = f.readline()
+ f.close()
+ return val.rstrip()
+
+
+def get_die_id(modelid):
+ dieid = ""
+
+ # get registers
+ if modelid.find("IGEP0046") == 0:
+ registers = [0x021BC420, 0x021BC410]
+ elif modelid.find("IGEP0034") == 0 or modelid.find("SOPA0000") == 0:
+ # registers: mac_id0_lo, mac_id0_hi, mac_id1_lo, mac_id1_hi
+ registers = [0x44e10630, 0x44e10634, 0x44e10638, 0x44e1063C]
+ elif modelid.find("OMAP3") == 0 or modelid.find("IGEP0020") == 0 or modelid.find("IGEP0030") == 0:
+ registers = [0x4830A224, 0x4830A220, 0x4830A21C, 0x4830A218]
+ elif modelid.find("OMAP5") == 0:
+ registers = [0x4A002210, 0x4A00220C, 0x4A002208, 0x4A002200]
+ elif modelid.find("IGEP0048") == 0:
+ return imx8m_readid()
+ else:
+ raise Exception('modelid not defined: {}, modelid')
+ for rg in registers:
+ dieid = dieid + read(rg)
+ return dieid
+
+
+def get_internal_mac(modelid):
+ mac = None
+
+ if modelid.find("IGEP0034") == 0 or modelid.find("SOPA0000") == 0:
+ # # registers: mac_id0_lo, mac_id0_hi
+ # registers = [0x44e10630, 0x44e10634]
+ # mac = ""
+ # for rg in registers:
+ # mac = mac + read(rg)
+ # #erase trailing zeros
+ # mac = mac[4::1]
+ # # To be finished...
+ mac = sh.cat("/sys/class/net/eth0/address").strip()
+
+ return mac
diff --git a/test-cli/test/helpers/iseelogger.py b/test-cli/test/helpers/iseelogger.py
new file mode 100644
index 0000000..4b1087c
--- /dev/null
+++ b/test-cli/test/helpers/iseelogger.py
@@ -0,0 +1,62 @@
+import logging
+import logging.handlers
+import datetime
+
+class ISEE_Logger(object):
+ __logger = None
+ __logHandler = None
+ __formater = None
+ __logHandlerConsole = None
+
+ def __init__(self, level=logging.INFO):
+ # Create syslog logger
+ self.__logger = logging.getLogger('ISEE_logger')
+ self.__logger.setLevel(level)
+ self.__logHandler = logging.handlers.SysLogHandler('/dev/log')
+ self.__logHandlerConsole = logging.StreamHandler()
+ self.__formater = logging.Formatter('Python: { "loggerName":"%(name)s", "timestamp":"%(asctime)s", "pathName":"%(pathname)s", "logRecordCreationTime":"%(created)f", "functionName":"%(funcName)s", "levelNo":"%(levelno)s", "lineNo":"%(lineno)d", "time":"%(msecs)d", "levelName":"%(levelname)s", "message":"%(message)s"}')
+ self.__logHandler.formatter = self.__formater
+ self.__logHandlerConsole.formatter = self.__formater
+ self.__logger.addHandler(self.__logHandler)
+ self.__logger.addHandler(self.__logHandlerConsole)
+
+ def setLogLevel(self, level):
+ if level.upper() == "DEBUG":
+ nlevel = logging.DEBUG
+ elif level.upper() == "INFO":
+ nlevel = logging.INFO
+ elif level.upper() == "ERROR":
+ nlevel = logging.ERROR
+ elif level.upper() == "WARNING":
+ nlevel = logging.WARNING
+ else:
+ nlevel = logging.DEBUG
+
+ self.__logger.setLevel(nlevel)
+
+ def getlogger(self):
+ return self.__logger
+
+class MeasureTime:
+ __difference = None
+ __first_time = None
+ __later_time = None
+
+ def __init__(self):
+ self.__first_time = datetime.datetime.now()
+
+ def start(self):
+ self.first_time = datetime.datetime.now()
+
+ def stop(self):
+ self.__later_time = datetime.datetime.now()
+ self.__difference = self.__later_time - self.__first_time
+ return self.__difference.total_seconds()
+
+ def getTime(self):
+ return self.__difference.total_seconds()
+
+
+global logObj
+logObj = ISEE_Logger(logging.INFO)
+
diff --git a/test-cli/test/helpers/iw.py b/test-cli/test/helpers/iw.py
new file mode 100644
index 0000000..8e29448
--- /dev/null
+++ b/test-cli/test/helpers/iw.py
@@ -0,0 +1,124 @@
+from sh import iw
+from scanf import scanf
+
+class iw_scan:
+ __interface = None
+ __raw_unparsed = []
+ __raw = None
+
+ def __init__(self, interface='wlan0'):
+ self.__interface = interface
+
+ def scan(self):
+ self.__raw_unparsed = []
+ self.__raw = iw('{}'.format(self.__interface), 'scan')
+ if self.__raw.exit_code == 0:
+ self.__raw_unparsed = self.__raw.stdout.decode("utf-8").split('\n')
+ return True
+ return False
+
+ def getRawData (self):
+ return self.__raw_unparsed
+
+ def getInterface(self):
+ return self.__interface
+
+ def getRawObject(self):
+ return self.__raw
+
+
+class iwScan:
+ __iw_scan = None
+ __station_list = {}
+
+ def __init__(self, interface='wlan0'):
+ self.__iw_scan = iw_scan(interface)
+
+ def scan(self):
+ if self.__iw_scan.scan():
+ self.__parse_scan(self.__iw_scan.getRawData())
+ return True
+ return False
+
+ def getLastError(self):
+ rObject = self.__iw_scan.getRawObject()
+ if rObject.exit_code != 0:
+ return rObject.stderr
+ return ''
+
+ def __Parse_BBS(self, line):
+ data = {}
+ res = scanf("BSS %s(on %s) -- %s", line)
+ if res == None:
+ res = scanf("BSS %s(on %s)", line)
+ data["MAC"] = res[0]
+ data["DEV"] = res[1]
+ if len(res) == 3:
+ data["ASSOCIATED"] = 'YES'
+ else:
+ data["ASSOCIATED"] = 'NO'
+ return data
+
+ def __Parse_T(self, id, OnNotFoundAttr ,line):
+ data = {}
+ res = scanf("{}: %s".format(id), line)
+ if res == None:
+ data[id.upper()] = OnNotFoundAttr
+ else:
+ data[id.upper()] = res[0]
+ return data
+
+ def __Parse_X(self, line):
+ data = {}
+ res = scanf("DS Parameter set: channel %s", line)
+ if res == None:
+ data['CHANNEL'] = '-1'
+ else:
+ data['CHANNEL'] = res[0]
+ return data
+
+ def __block_stationlist(self, rawdata):
+ list = {}
+ count = 0
+ for line in rawdata:
+ idx = line.find('BSS')
+ if idx != -1 and idx == 0:
+ if len(list) > 0:
+ count += 1
+ self.__station_list[count] = list
+ list = self.__Parse_BBS(line)
+ elif line.find('SSID:') != -1:
+ list = {**list, **self.__Parse_T('SSID', 'HIDDEN', line)}
+ elif line.find('freq:') != -1:
+ list = {**list, **self.__Parse_T('freq', '', line)}
+ elif line.find('signal:') != -1:
+ list = {**list, **self.__Parse_T('signal', '', line)}
+ elif line.find('DS Parameter set:') != -1:
+ list = {**list, **self.__Parse_X(line)}
+ if count > 0:
+ count += 1
+ self.__station_list[count] = list
+
+ def __parse_scan(self, rawData):
+ self.__station_list = {}
+ self.__block_stationlist(rawData)
+ #print('{}'.format(self.__station_list))
+ #for i in self.__station_list:
+ # print(self.__station_list[i])
+
+ def findConnected (self):
+ for i in self.__station_list:
+ st = self.__station_list[i]
+ if st.get('ASSOCIATED', 'NO') == 'YES':
+ return True, self.__station_list[i]
+ return False, None
+
+ def findBySSID(self, ssid):
+ for i in self.__station_list:
+ st = self.__station_list[i]
+ if st.get('SSID', '') == ssid:
+ return True, self.__station_list[i]
+ return False, None
+
+ def getStations(self):
+ return self.__station_list \ No newline at end of file
diff --git a/test-cli/test/helpers/md5.py b/test-cli/test/helpers/md5.py
new file mode 100644
index 0000000..9bc9e23
--- /dev/null
+++ b/test-cli/test/helpers/md5.py
@@ -0,0 +1,8 @@
+import hashlib
+
+def md5_file(fname):
+ hash_md5 = hashlib.md5()
+ with open(fname, "rb") as f:
+ for chunk in iter(lambda: f.read(4096), b""):
+ hash_md5.update(chunk)
+ return hash_md5.hexdigest() \ No newline at end of file
diff --git a/test-cli/test/helpers/plc.py b/test-cli/test/helpers/plc.py
new file mode 100644
index 0000000..069058d
--- /dev/null
+++ b/test-cli/test/helpers/plc.py
@@ -0,0 +1,82 @@
+import sh
+from sh import flashcp
+from sh import flash_eraseall
+from sh import ErrorReturnCode
+from sh import Command
+from test.helpers.gpio import gpio
+import time
+
+
+class dcpPLC(object):
+ __nReset = None
+ __Phy = None
+ __Plc = None
+ __mtd_device = None
+ __factool = None
+ __myConfigTool = None
+
+ def __init__(self, plcfactool, mtd_device):
+ # default save firmware
+ self.__nRest = gpio('75', 'out', '0')
+ self.__nRest = gpio('69', 'out', '0')
+ self.__nRest = gpio('78', 'out', '1')
+ self.__factool = plcfactool
+ self.__mtd_device = mtd_device
+ self.__myConfigTool = Command(self.__factool)
+
+ def setSaveFirmwareMode(self):
+ self.__nRest = gpio('75', 'out', '0')
+ self.__nRest = gpio('69', 'out', '0')
+ self.__nRest = gpio('78', 'out', '1')
+
+ def setBootMode(self):
+ self.__nRest = gpio('78', 'out', '0')
+ self.__nRest = gpio('75', 'out', '1')
+ self.__nRest = gpio('69', 'out', '1')
+
+ def setPLCReset(self):
+ self.__nRest = gpio('75', 'out', '0')
+ self.__nRest = gpio('69', 'out', '0')
+ time.sleep(1)
+ self.__nRest = gpio('69', 'out', '1')
+ self.__nRest = gpio('75', 'out', '1')
+
+ def SaveFirmware(self, firmare):
+ self.setSaveFirmwareMode()
+ try:
+ flash_eraseall(self.__mtd_device)
+ flashcp(firmare, self.__mtd_device)
+ except ErrorReturnCode as Error:
+ return False, "plc flash firmware failed {} ".format(Error.exit_code)
+ return True, ''
+
+ def set_plc(self, var, value, password):
+ try:
+ res = self.__myConfigTool("-o", "SET", "-p", "{}={}".format(var, value), "-w", "{}".format(password))
+ print(res)
+ except ErrorReturnCode as Error:
+ return False, "set var failed {} {}".format(var, Error.exit_code)
+ return True, ''
+
+ def set_plc2(self, var1, value1, var2, value2, password):
+ try:
+ res = self.__myConfigTool("-o", "SET", "-p", "{}={}".format(var1, value1), "-p",
+ "{}={}".format(var2, value2), "-w", "{}".format(password))
+ print(res)
+ except ErrorReturnCode as Error:
+ return False, "set var failed {}".format(Error.exit_code)
+ return True, ''
+
+ def get_plc(self, var, value, password):
+ try:
+ self.__myConfigTool("-o", "GET", "-p", "{}".format(var), '{}'.format(value), "-w", "{}".format(password))
+ except ErrorReturnCode as Error:
+ return False, "set var failed {} {}".format(var, Error.exit_code)
+ return True, ''
+
+ def discover(self):
+ try:
+ self.__myConfigTool("-o", "DISCOVER")
+ except ErrorReturnCode:
+ return False
+ return True
diff --git a/test-cli/test/helpers/psqldb.py b/test-cli/test/helpers/psqldb.py
index 26dd03d..0141414 100644
--- a/test-cli/test/helpers/psqldb.py
+++ b/test-cli/test/helpers/psqldb.py
@@ -1,34 +1,40 @@
import psycopg2
+import psycopg2.extras
+
class PgSQLConnection(object):
- """aaaaaaa"""
+ """Postgres Connection Object"""
__conection_object = None
__db_config = {'dbname': 'testsrv', 'host': '192.168.2.171',
- 'password': 'Idkfa2009', 'port': 5432, 'user': 'admin'}
+ 'password': 'Idkfa2009', 'port': 5432, 'user': 'admin'}
- def __init__ (self):
-# self.__conection_object = None
-# if connect_str is not None:
-# self.__db_config = connect_str
-# else:
- self.__db_config = {'dbname': 'testsrv', 'host': '192.168.2.171',
- 'password': 'Idkfa2009', 'port': 5432, 'user': 'admin'}
+ def __init__(self, connect_str=None):
+ self.__conection_object = None
+ if connect_str is not None:
+ self.__db_config = connect_str
+ else:
+ self.__db_config = {'dbname': 'testsrv', 'host': '192.168.2.171',
+ 'password': 'Idkfa2009', 'port': 5432, 'user': 'admin'}
- def db_connect (self, connect_str):
+ def db_connect(self, connect_str=None):
result = False
try:
if connect_str == None:
self.__conection_object = psycopg2.connect(**self.__db_config)
else:
- self.__db_config = connect_str;
+ self.__db_config = connect_str
self.__conection_object = psycopg2.connect(**self.__db_config)
+
self.__conection_object.autocommit = True
result = True
except Exception as error:
print(error)
return result
+ def getConfig(self):
+ return self.__db_config
+
def db_execute_query(self, query):
cur = self.__conection_object.cursor()
cur.execute(query)
@@ -36,20 +42,26 @@ class PgSQLConnection(object):
cur.close()
return data
+ def db_upload_large_file(self, filepath):
+ # a new connection must be created to use large objects
+ __conn = psycopg2.connect(**self.__db_config)
+ lobj = __conn.lobject(oid=0, mode="rw", new_oid=0, new_file=filepath)
+ fileoid = lobj.oid
+ lobj.close()
+ __conn.commit()
+ __conn.close()
+ return fileoid
+
def commit(self):
self.__conection_object.commit()
-
def db_close(self):
if self.__conection_object is not None:
self.__conection_object.close()
del self.__conection_object
self.__conection_object = None
-
def __del__(self):
if self.__conection_object is not None:
self.__conection_object.close()
- #print("disconnected")
-
-
+ # print("disconnected")
diff --git a/test-cli/test/helpers/qrreader.py b/test-cli/test/helpers/qrreader.py
new file mode 100644
index 0000000..51e5247
--- /dev/null
+++ b/test-cli/test/helpers/qrreader.py
@@ -0,0 +1,123 @@
+import evdev
+from evdev import InputDevice, categorize, ecodes
+import threading
+import time
+import selectors
+from selectors import DefaultSelector, EVENT_READ
+from test.helpers.iseelogger import logObj
+
+selector = selectors.DefaultSelector()
+
+qrdevice_list = [
+ "Honeywell Imaging & Mobility 1900",
+ "Manufacturer Barcode Reader",
+ "SM SM-2D PRODUCT HID KBW",
+ "Honeywell Imaging & Mobility 1400g"
+]
+
+qrkey_list = {'KEY_0': '0',
+ 'KEY_1': '1',
+ 'KEY_2': '2',
+ 'KEY_3': '3',
+ 'KEY_4': '4',
+ 'KEY_5': '5',
+ 'KEY_6': '6',
+ 'KEY_7': '7',
+ 'KEY_8': '8',
+ 'KEY_9': '9'
+ }
+
+
+class QRReader:
+ __qrstr = ""
+ __myReader = {}
+ __numdigits = 10
+ __dev = None
+
+ def __init__(self):
+ self.getQRlist()
+
+ def getQRlist(self):
+ devices = [evdev.InputDevice(path) for path in evdev.list_devices()]
+ logObj.getlogger().debug("{}".format(devices))
+ for device in devices:
+ logObj.getlogger().debug("{}".format(device.name))
+ if device.name in qrdevice_list:
+ self.__myReader['NAME'] = device.name
+ # print(self.__myReader['NAME'])
+ self.__myReader['PATH'] = device.path
+ # print(self.__myReader['PATH'])
+ self.__myReader['PHYS'] = device.phys
+ # print(self.__myReader['PHYS'])
+
+ def IsQR(self):
+ return 'NAME' in self.__myReader
+
+ def getQRNumber(self):
+ return self.__qrstr
+
+ def openQR(self):
+ if self.IsQR():
+ self.closeQR()
+ self.__dev = InputDevice(self.__myReader['PATH'])
+ return True
+ return False
+
+ def closeQR(self):
+ if self.__dev:
+ del self.__dev
+ self.__dev = None
+
+ def readQR(self):
+ """" Sync Read up to numdigits """
+ count = 0
+ self.__qrstr = ""
+ if self.__dev:
+ self.__dev.grab()
+ for event in self.__dev.read_loop():
+ if event.type == evdev.ecodes.EV_KEY:
+ c_event = categorize(event)
+ if c_event.keycode in qrkey_list:
+ if c_event.keystate == 0:
+ self.__qrstr += qrkey_list[c_event.keycode]
+ count += 1
+ if count == self.__numdigits:
+ break
+ self.__dev.ungrab()
+ return True
+ return False
+
+ def wait_event(self, timeout):
+ selector.register(self.__dev, selectors.EVENT_READ)
+ while True:
+ events = selector.select(timeout);
+ if not events:
+ return False
+ else:
+ return True
+
+ def readQRasync(self, timeout):
+ count = 0
+ r = False
+ self.__qrstr = ""
+ if self.__dev:
+ self.__dev.grab()
+ if self.wait_event(timeout):
+ for event in self.__dev.read_loop():
+ if event.type == evdev.ecodes.EV_KEY:
+ c_event = categorize(event)
+ if c_event.keycode in qrkey_list:
+ if c_event.keystate == 0:
+ self.__qrstr += qrkey_list[c_event.keycode]
+ count += 1
+ if count == self.__numdigits:
+ r = True
+ break
+ self.__dev.ungrab()
+ return r
+ return False
+
+# qr = QRReader()
+# if qr.openQR():
+# print(qr.readQRasync(2))
+# qr.closeQR()
diff --git a/test-cli/test/helpers/sdl.py b/test-cli/test/helpers/sdl.py
new file mode 100644
index 0000000..8131a53
--- /dev/null
+++ b/test-cli/test/helpers/sdl.py
@@ -0,0 +1,126 @@
+import sdl2.ext
+from sdl2 import *
+import os
+import time
+
+Colors = {
+ 'red': sdl2.ext.Color(255, 0, 0, 255),
+ 'green': sdl2.ext.Color(0, 255, 0, 255),
+ 'blue': sdl2.ext.Color(0, 0, 255, 255),
+ 'white': sdl2.ext.Color(255, 255, 255, 255),
+ 'black': sdl2.ext.Color(0, 0, 0, 255),
+}
+
+
+class SDL2(object):
+ __resources = None
+ __w = 1280
+ __h = 720
+ __winName = "noname"
+ __window = None
+ __factory = None
+ __renderer = None
+ __srender = None
+
+ def __init__(self, driver, display, w, h):
+ os.environ['SDL_VIDEODRIVER'] = "x11"
+ os.environ['DISPLAY'] = ":0"
+ # self.__resources = sdl2.ext.Resources(parent.getAppPath(), 'files')
+ sdl2.ext.init()
+ self.__createWindow()
+
+ def __createWindow(self):
+ self.__window = sdl2.ext.Window(title='{}'.format(self.__winName), size=(self.__w, self.__h), position=(0, 0),
+ flags=(
+ SDL_WINDOW_HIDDEN | SDL_WINDOW_FULLSCREEN | SDL_WINDOW_BORDERLESS | SDL_WINDOW_MAXIMIZED))
+ self.__renderer = sdl2.ext.Renderer(self.__window)
+ self.__factory = sdl2.ext.SpriteFactory(sdl2.ext.TEXTURE, renderer=self.__renderer)
+ self.__srender = self.__factory.create_sprite_render_system(self.__window)
+ self.__window.show()
+ SDL_ShowCursor(SDL_DISABLE)
+
+ # def createFont(self, fontName, fontSize, fontColor):
+ # return sdl2.ext.FontManager(font_path=self.__resources.get_path(fontName), size=fontSize, color=fontColor)
+
+ def WriteText(self, text, x, y, fontManager):
+ imText = self.__factory.from_text(text, fontmanager=fontManager)
+ self.__renderer.copy(imText, dstrect=(x, y, imText.size[0], imText.size[1]))
+
+ def show(self):
+ self.__window.show()
+
+ def hide(self):
+ self.__window.hide()
+
+ def setColor(self, red, green, blue, alpha):
+ self.__renderer.color = sdl2.ext.Color(red, green, blue, alpha)
+
+ def fillColor(self, red, green, blue, alpha):
+ self.setColor(red, green, blue, alpha)
+ self.__renderer.clear()
+
+ def fillbgColor(self, color, update=False):
+ if color in Colors:
+ self.__renderer.color = Colors[color]
+ self.__renderer.clear()
+ if update:
+ self.update()
+
+ def update(self):
+ self.__renderer.present()
+
+ # def showImage(self, filename):
+ # im = self.__factory.from_image(self.__resources.get_path(filename))
+ # self.__srender.render(im)
+
+
+class SDL2_Test(object):
+ __sdl2 = None
+
+ def __init__(self, driver, display, w, h):
+ self.__sdl2 = SDL2(driver, display, w, h)
+
+ def Clear(self):
+ self.__sdl2.fillbgColor('black', True)
+
+ def Paint(self, dcolor):
+ self.Clear()
+ self.__sdl2.fillbgColor(dcolor.lower(), True)
+
+
+class SDL2_Message(object):
+ __sdl2 = None
+ __fontManager_w = None
+ __fontManager_b = None
+
+ def __init__(self):
+ self.__sdl2 = SDL2()
+ self.__fontManager_w = self.__sdl2.createFont('OpenSans-Regular.ttf', 24, Colors['white'])
+ self.__fontManager_b = self.__sdl2.createFont('OpenSans-Regular.ttf', 24, Colors['black'])
+
+ def Clear(self):
+ self.__sdl2.fillbgColor('black', True)
+
+ def Paint(self, dcolor):
+ self.Clear()
+ self.__sdl2.fillbgColor(dcolor.lower(), True)
+
+ def __write_w(self, x, y, text):
+ self.__sdl2.WriteText(text, x, y, self.__fontManager_w)
+
+ def __write_b(self, x, y, text):
+ self.__sdl2.WriteText(text, x, y, self.__fontManager_b)
+
+ def setTestOK(self, board_uuid, qrdata):
+ self.Paint('GREEN')
+ self.__write_b(100, 50, 'TEST OK')
+ self.__write_b(100, 100, 'uuid={}'.format(board_uuid))
+ self.__write_b(100, 150, 'qr={}'.format(qrdata))
+ self.__sdl2.update()
+
+ def setTestERROR(self, error):
+ self.Paint('RED')
+ self.__write_w(100, 50, 'TEST FAILED')
+ self.__write_w(100, 100, '{}'.format(error))
+ self.__sdl2.update()
+
diff --git a/test-cli/test/helpers/setup_xml.py b/test-cli/test/helpers/setup_xml.py
index 3fd9fd5..d61d1fb 100644
--- a/test-cli/test/helpers/setup_xml.py
+++ b/test-cli/test/helpers/setup_xml.py
@@ -1,22 +1,22 @@
import xml.etree.ElementTree as XMLParser
class XMLSetup (object):
- """aaaaa"""
+ """XML Setup Parser"""
__tree = None # Parser
__dbType = None # database connection required: PgSQLConnection
__dbConnectionRaw = None # Connection string in raw
__dbConnectionStr = None # Connection string to use in sql object connection
def __init__(self, filename):
- """aaaaa"""
+ """Parse the file in the constructor"""
self.__tree = XMLParser.parse(filename)
def __del__(self):
- """aaaaa"""
+ """Destructor do nothing"""
pass
- def getdbConnectionStr (self):
- """aaaaa"""
+ def getdbConnectionStr(self):
+ """XML to database connection string"""
if self.__dbConnectionRaw is not None:
return self.__dbConnectionRaw
@@ -29,12 +29,44 @@ class XMLSetup (object):
return None
- def getPostgresConnectionStr (self):
- """aaaaa"""
+ def getPostgresConnectionStr(self):
+ """Get Connection string """
str = self.__dbConnectionRaw
del str['type']
return str
- def getMysqlConnectionStr (self):
+ def getBoard(self, key, default):
+ for element in self.__tree.iter('board'):
+ if key in element.attrib:
+ return element.attrib[key]
+ return default
+
+ def getTempPath(self, key, default):
+ for element in self.__tree.iter('tmpPath'):
+ if key in element.attrib:
+ return element.attrib[key]
+ return default
+
+ def getKeyVal(self, tag, key, default):
+ for element in self.__tree.iter(tag):
+ if key in element.attrib:
+ return element.attrib[key]
+ return default
+
+ def gettagKey (self, xmltag, xmlkey):
"""aaaaa"""
- pass \ No newline at end of file
+ for element in self.__tree.iter(xmltag):
+ return element.attrib[xmlkey]
+
+ return None
+
+ def getUSBlist(self):
+ list = {}
+ count = 0
+ for elements in self.__tree.iter("usbdev"):
+ sublist = {}
+ for key,val in elements.items():
+ sublist[key] = val
+ list[count] = sublist
+ count += 1
+ return list \ No newline at end of file
diff --git a/test-cli/test/helpers/syscmd.py b/test-cli/test/helpers/syscmd.py
index b579e39..7223774 100644
--- a/test-cli/test/helpers/syscmd.py
+++ b/test-cli/test/helpers/syscmd.py
@@ -1,6 +1,6 @@
import unittest
import subprocess
-from test.helpers.globalVariables import globalVar
+import threading
class TestSysCommand(unittest.TestCase):
@@ -10,7 +10,7 @@ class TestSysCommand(unittest.TestCase):
__outdata = None
__outtofile = False
- def __init__(self, testname, testfunc, str_cmd, outtofile = False):
+ def __init__(self, testname, testfunc, str_cmd, outtofile=False):
""" init """
super(TestSysCommand, self).__init__(testfunc)
self.__str_cmd = str_cmd
@@ -42,7 +42,7 @@ class TestSysCommand(unittest.TestCase):
else:
res = -3
outdata = completed.stdout
- self.longMessage=str(outdata).replace("'","")
+ self.longMessage = str(outdata).replace("'", "")
self.assertTrue(True)
except subprocess.CalledProcessError as err:
self.assertTrue(False)
@@ -54,11 +54,14 @@ class TestSysCommand(unittest.TestCase):
def remove_file(self):
pass
+
class SysCommand(object):
__str_cmd = None
__cmdname = None
__outdata = None
__errdata = None
+ __returnCode = None
+ __exception = None
def __init__(self, cmdname, str_cmd):
""" init """
@@ -66,31 +69,37 @@ class SysCommand(object):
self.__cmdname = cmdname
def getName(self):
- return self.__testname
+ return self.__cmdname
+
+ def setName(self, cmdName):
+ self.__cmdname = cmdName
+
+ def getParams(self):
+ return self.__str_cmd
+
+ def setParam(self, params):
+ self.__str_cmd = params
def execute(self):
- res = -1
+ # try:
+ self.__outdata = None
+ self.__errdata = None
+ self.__exception = None
try:
- self.__outdata = None
- self.__errdata = None
completed = subprocess.run(
self.__str_cmd,
- check=True,
+ check=False,
shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE
)
+ self.__returnCode = completed.returncode
self.__outdata = completed.stdout
- if completed.returncode is 0:
- res = 0
- if completed.stderr.decode('ascii') != "":
- res = -1
- self.__errdata = completed.stderr
+ self.__errdata = completed.stderr.decode('ascii')
except subprocess.CalledProcessError as err:
- res = -2
- except Exception as t:
- res = -3
- return res
+ self.__exception = err
+ self.__returnCode = -1
+ return self.__returnCode
def getOutput(self):
return self.__outdata
@@ -98,6 +107,12 @@ class SysCommand(object):
def getOutErr(self):
return self.__errdata
+ def getException(self):
+ return self.__exception
+
+ def IsException(self):
+ return self.__exception is not None
+
def getOutputlines(self):
return self.__outdata.splitlines()
@@ -106,3 +121,26 @@ class SysCommand(object):
f.write(self.__outdata)
f.close()
+
+class AsyncSys(threading.Thread):
+ sysObject = None
+ sysres = None
+
+ def __init__(self, threadID, name, counter):
+ threading.Thread.__init__(self)
+ self.threadID = threadID
+ self.name = name
+ self.counter = counter
+ self.sysObject = SysCommand(name, None)
+
+ def setParams(self, Params):
+ self.sysObject.setParam(Params)
+
+ def getRes(self):
+ return self.sysres
+
+ def getData(self):
+ return self.sysObject.getOutput()
+
+ def run(self):
+ self.sysres = self.sysObject.execute()
diff --git a/test-cli/test/helpers/testsrv_db.py b/test-cli/test/helpers/testsrv_db.py
index 94181f9..8e96e8c 100644
--- a/test-cli/test/helpers/testsrv_db.py
+++ b/test-cli/test/helpers/testsrv_db.py
@@ -1,10 +1,10 @@
from test.helpers.psqldb import PgSQLConnection
-from test.helpers.setup_xml import XMLSetup
-def find_between( s, first, last ):
+
+def find_between(s, first, last):
try:
- start = s.index( first ) + len( first )
- end = s.index( last, start )
+ start = s.index(first) + len(first)
+ end = s.index(last, start)
return s[start:end]
except ValueError:
return ""
@@ -19,158 +19,242 @@ class TestSrv_Database(object):
def __init__(self):
pass
- def open (self, filename):
- '''Open database connection'''
- self.__xml_setup = XMLSetup(filename)
+ def open(self, xmlObj):
+ self.__xml_setup = xmlObj
self.__sqlObject = PgSQLConnection()
return self.__sqlObject.db_connect(self.__xml_setup.getdbConnectionStr())
- def create_board(self, processor_id, model_id, variant, bmac = None):
+ def getConfig(self):
+ return self.__sqlObject.getConfig()
+
+ def create_board(self, processor_id, model_id, variant, station, bmac=None):
'''create a new board'''
if bmac is None:
- sql = "SELECT isee.create_board('{}', '{}', '{}', NULL);".format(processor_id, model_id, variant)
+ sql = "SELECT * FROM isee.f_create_board('{}', '{}', '{}', NULL, '{}');".format(processor_id, model_id,
+ variant,
+ station)
else:
- sql = "SELECT isee.create_board('{}', '{}', '{}', '{}');".format(processor_id, model_id, variant, bmac)
- #print('>>>' + sql)
+ sql = "SELECT * FROM isee.f_create_board('{}', '{}', '{}', '{}', '{}');".format(processor_id, model_id,
+ variant,
+ bmac, station)
+ # print('>>>' + sql)
try:
res = self.__sqlObject.db_execute_query(sql)
- #print(res)
- return res[0][0];
+ # print(res)
+ return res[0][0]
except Exception as err:
r = find_between(str(err), '#', '#')
- #print(r)
+ print(r)
+ print(str(err))
return None
- def create_model(self, modid, variant, descr, tgid):
- '''create new model'''
- sql = "SELECT isee.create_model('{}', '{}', '{}', '{}')".format(modid, variant, descr, tgid)
- #print('>>>' + sql)
+ def get_tests_list(self, board_uuid):
+ '''get the board test list'''
+ sql = "SELECT * FROM isee.f_get_tests_list('{}')".format(board_uuid)
+ # print('>>>' + sql)
try:
res = self.__sqlObject.db_execute_query(sql)
- #print(res)
- return res[0][0];
+ # print(res)
+ return res
except Exception as err:
r = find_between(str(err), '#', '#')
- #print(r)
+ # print(r)
return None
- def create_test_definition(self, testname, testdesc, testfunc):
- '''Create a new definition and return definition id on fail (testname already exist) return -1'''
- sql = "SELECT isee.define_test('{}', '{}', '{}')".format(testname, testdesc, testfunc)
- #print('>>>' + sql)
+ def get_test_params_list(self, testid):
+ sql = "SELECT * FROM isee.f_get_test_params_list({})".format(testid)
+ # print('>>>' + sql)
try:
res = self.__sqlObject.db_execute_query(sql)
- #print(res)
- return res[0][0];
+ # print(res)
+ return res
except Exception as err:
r = find_between(str(err), '#', '#')
- #print(r)
+ # print(r)
return None
- def add_testdef_to_group(self, testgroupid, testname, testparam):
- '''Assign definition to group test return true on success or false if it fails'''
- sql = "SELECT isee.add_test_to_group('{}', '{}', '{}')".format(testgroupid, testname, testparam)
- #print('>>>' + sql)
+ def open_test(self, board_uuid):
+ sql = "SELECT * FROM isee.f_open_test('{}')".format(board_uuid)
+ # print('>>>' + sql)
try:
res = self.__sqlObject.db_execute_query(sql)
- #print(res)
- return res[0][0];
+ # print(res)
+ return res[0][0]
except Exception as err:
r = find_between(str(err), '#', '#')
- #print(r)
+ # print(r)
return None
- def getboard_test_list(self, board_uuid):
- '''get the board test list'''
- sql = "SELECT isee.gettestlist('{}')".format(board_uuid)
- #print('>>>' + sql)
+ def run_test(self, testid_ctl, testid):
+ sql = "SELECT isee.f_run_test({},{})".format(testid_ctl, testid)
+ # print('>>>' + sql)
+ try:
+ self.__sqlObject.db_execute_query(sql)
+ except Exception as err:
+ r = find_between(str(err), '#', '#')
+ # print(r)
+ return None
+
+ def finish_test(self, testid_ctl, testid, newstatus, textresult):
+ sql = "SELECT isee.f_finish_test({},{},'{}','{}')".format(testid_ctl, testid, newstatus, textresult)
+ # print('>>>' + sql)
+ try:
+ print('finish_test => SQL {}'.format(sql))
+ self.__sqlObject.db_execute_query(sql)
+ return True
+ except Exception as err:
+ r = find_between(str(err), '#', '#')
+ print('finish_test => {}'.format(err))
+ return False
+
+ def upload_result_file(self, testid_ctl, testid, desc, filepath, mimetype):
+ try:
+ # generate a new oid
+ fileoid = self.__sqlObject.db_upload_large_file(filepath)
+ # insert into a table
+ sql = "SELECT isee.f_upload_result_file({},{},'{}','{}','{}')".format(testid_ctl, testid, desc, fileoid,
+ mimetype)
+ # print('>>>' + sql)
+ self.__sqlObject.db_execute_query(sql)
+ except Exception as err:
+ r = find_between(str(err), '#', '#')
+ # print(r)
+ return None
+
+ def get_task_variables_list(self, board_uuid):
+ sql = "SELECT * FROM isee.f_get_task_variables_list('{}')".format(board_uuid)
+ # print('>>>' + sql)
try:
res = self.__sqlObject.db_execute_query(sql)
- #print(res)
- return res;
+ # print(res)
+ return res
except Exception as err:
r = find_between(str(err), '#', '#')
- #print(r)
+ # print(r)
return None
- def getboard_comp_test_list(self, board_uuid):
- '''get the board test list'''
- sql = "SELECT isee.gettestcompletelist('{}')".format(board_uuid)
- #print('>>>' + sql)
+
+ def get_plc_macaddr(self, board_uuid):
+ sql = "SELECT * FROM isee.f_get_plcmac(\'{}\')".format(board_uuid)
+ # print('>>>' + sql)
+ try:
+ res = self.__sqlObject.db_execute_query(sql)
+ return res, ''
+ except Exception as err:
+ r = find_between(str(err), '#', '#')
+ print(r)
+ return None, r
+
+
+ def get_board_macaddr(self, board_uuid):
+ sql = "SELECT * FROM isee.f_get_board_macaddr('{}')".format(board_uuid)
+ # print('>>>' + sql)
try:
res = self.__sqlObject.db_execute_query(sql)
- #print(res)
- return res;
+ # print(res)
+ return res[0][0]
except Exception as err:
r = find_between(str(err), '#', '#')
- #print(r)
+ # print(r)
return None
- def open_testbatch(self, board_uuid):
- '''get the board test list'''
- sql = "SELECT isee.open_testbatch('{}')".format(board_uuid)
- #print('>>>' + sql)
+ def open_task(self, uuid):
+ sql = "SELECT * FROM isee.f_open_task('{}')".format(uuid)
+ # print('>>>' + sql)
try:
- res = str(self.__sqlObject.db_execute_query(sql)[0])
- res = res.replace('(', '')
- res = res.replace(')', '')
- res = res.replace(',', '')
- #print(res)
- return res;
+ res = self.__sqlObject.db_execute_query(sql)
+ # print(res)
+ return res[0][0]
except Exception as err:
r = find_between(str(err), '#', '#')
- #print(r)
+ # print(r)
return None
- def add_test_to_batch(self, board_uuid, testid, testid_ctl, result, groupid, data):
- '''get the board test list'''
- sql = "SELECT isee.add_test_to_batch_c('{}','{}','{}','{}','{}','{}')".format(board_uuid, testid, testid_ctl, result, groupid, data)
- #print('>>>' + sql)
+ def create_task_result(self, taskid_ctl, name, newstatus, newinfo=None):
+ sql = "SELECT isee.f_create_task_result({},'{}','{}','{}')".format(taskid_ctl, name, newstatus, newinfo)
+ # print('>>>' + sql)
+ try:
+ self.__sqlObject.db_execute_query(sql)
+ except Exception as err:
+ r = find_between(str(err), '#', '#')
+ # print(r)
+ return None
+
+ def update_taskctl_status(self, taskid_ctl, newstatus):
+ sql = "SELECT isee.f_update_taskctl_status({},'{}')".format(taskid_ctl, newstatus)
+ # print('>>>' + sql)
+ try:
+ self.__sqlObject.db_execute_query(sql)
+ except Exception as err:
+ r = find_between(str(err), '#', '#')
+ # print(r)
+ return None
+
+ def set_factorycode(self, uuid, factorycode):
+ sql = "SELECT isee.f_set_factorycode('{}','{}')".format(uuid, factorycode)
+ # print('>>>' + sql)
+ try:
+ self.__sqlObject.db_execute_query(sql)
+ except Exception as err:
+ r = find_between(str(err), '#', '#')
+ # print(r)
+ return None
+
+ def read_station_state(self, station):
+ sql = "SELECT * FROM station.getmystate('{}');".format(station)
+ # print('>>>' + sql)
try:
res = self.__sqlObject.db_execute_query(sql)
- #print(res)
- return res;
+ # print(res)
+ return res[0][0]
except Exception as err:
r = find_between(str(err), '#', '#')
- #print(r)
+ # print(r)
return None
- def close_testbatch(self, board_uuid, testid_ctl):
- '''get the board test list'''
- sql = "SELECT isee.close_testbatch('{}','{}')".format(board_uuid, testid_ctl)
- #print('>>>' + sql)
+ def change_station_state(self, station, newstate):
+ sql = "SELECT station.setmystate('{}', '{}', NULL)".format(newstate, station)
+ print('>>>' + sql)
try:
res = self.__sqlObject.db_execute_query(sql)
- #print(res)
- return res;
+ print(res)
+ return res[0][0]
except Exception as err:
r = find_between(str(err), '#', '#')
- #print(r)
+ print(r)
return None
- def update_set_test_row(self, nstation, testid_ctl, board_uuid, ctest, cstatus):
- '''get the board test list'''
- sql = "SELECT isee.update_set_test_row('{}','{}','{}','{}','{}')".format(nstation, testid_ctl ,board_uuid, ctest, cstatus)
- #print('>>>' + sql)
+ def get_setup_variable(self, skey):
+ sql = "SELECT * FROM admin.get_setupvar('{}')".format(skey)
+ # print('>>>' + sql)
try:
res = self.__sqlObject.db_execute_query(sql)
- #print(res)
- return res;
+ # print(res)
+ return res[0][0]
except Exception as err:
r = find_between(str(err), '#', '#')
- #print(r)
+ # print(r)
return None
- def getboard_eeprom(self, board_uuid):
- '''get the board eeprom struct '''
- sql = "SELECT isee.getboard_eeprom('{}')".format(board_uuid)
- #print('>>>' + sql)
+ def get_setup_variable(self, skey , default):
+ sql = "SELECT * FROM admin.get_setupvar('{}')".format(skey)
+ # print('>>>' + sql)
+ try:
+ res = self.__sqlObject.db_execute_query(sql)
+ # print(res)
+ return res[0][0]
+ except Exception as err:
+ r = find_between(str(err), '#', '#')
+ # print(r)
+ return default
+
+ def setDevelStationState(self, station, newState):
+ sql = "UPDATE station.station_state SET state = {} WHERE hostname={}".format(newState, station)
try:
res = self.__sqlObject.db_execute_query(sql)
- #print(res)
- return res;
+ # print(res)
+ return res[0][0]
except Exception as err:
r = find_between(str(err), '#', '#')
- #print(r)
+ # print(r)
return None
- \ No newline at end of file
diff --git a/test-cli/test/helpers/uboot_flasher.py b/test-cli/test/helpers/uboot_flasher.py
deleted file mode 100644
index e69de29..0000000
--- a/test-cli/test/helpers/uboot_flasher.py
+++ /dev/null
diff --git a/test-cli/test/helpers/usb.py b/test-cli/test/helpers/usb.py
new file mode 100644
index 0000000..8f8848d
--- /dev/null
+++ b/test-cli/test/helpers/usb.py
@@ -0,0 +1,270 @@
+from test.helpers.syscmd import SysCommand
+from test.helpers.amper import Amper
+import scanf
+import os
+import json
+
+
+class USBDevices(object):
+ __dev_list = {}
+ __serial_converter = {}
+ __usb_disk = {}
+ __usb_amper = {}
+ __dev_list_filtered = {}
+ __usb_mstorage = {}
+ __xml_config = None
+
+ def __init__(self, xml):
+ self.__xml_config = xml
+ self.__getusblist()
+ self.__getSerialDevices()
+ self.__getAmper()
+ self.__getMassStorage()
+
+ def print_usb(self):
+ print("------ usb list -------")
+ print(json.dumps(self.__dev_list))
+ print("------ serial converter list -------")
+ print(self.__serial_converter)
+ print("------ usb disk list -------")
+ print(self.__usb_disk)
+ print("------ usb amper list -------")
+ print(self.__usb_amper)
+ print("------ usb mass storage list -------")
+ print(self.__usb_mstorage)
+ print("------ filtered list -------")
+ print(json.dumps(self.__dev_list_filtered))
+
+ def refresh(self):
+ self.__dev_list = {}
+ self.__serial_converter = {}
+ self.__usb_disk = {}
+ self.__usb_amper = {}
+ self.__usb_mstorage = {}
+ self.__dev_list_filtered = {}
+ self.__getusblist()
+ self.__getSerialDevices()
+ self.__getAmper()
+ self.__getMassStorage()
+
+ def getMassStorage(self):
+ return self.__usb_mstorage
+
+ def getAmper(self):
+ return self.__usb_amper
+
+ def __getAmper(self):
+ for c, s_items in self.__serial_converter.items():
+ for k,v in s_items.items():
+ if k.find("USB") != -1 and v == "Prolific_Technology_Inc._USB-Serial_Controller":
+ testAmper = Amper(port=k)
+ testAmper.open()
+ res = testAmper.hello()
+ if res:
+ self.__usb_amper = {'port': k, 'ver': testAmper.getVersion()}
+ testAmper.close()
+ if res:
+ return
+
+ def __getMassStorage(self):
+ for c, s_items in self.__usb_disk.items():
+ for k,v in s_items.items():
+ if k == "udev":
+ if v.get('ID_MODEL', None) == "File-Stor_Gadget" or v.get('ID_FS_LABEL', None) != "NODE_L031K6":
+ self.__usb_mstorage = {'disk': v.get('DEVNAME', ''), 'id': v.get('ID_FS_LABEL_ENC', '')}
+
+
+ def __check_ignore_usb_list(self, usb_list, dev_list):
+ count = 0
+ ignore = "0"
+ for _, l in usb_list.items():
+ for k, v in l.items():
+ if k == "ignore":
+ ignore = v
+ else:
+ value = dev_list.get(k, None)
+ if v == value:
+ count += 1
+ else:
+ count = 0
+ break
+ if count >= len(l) -1:
+ break
+
+ if count > 0:
+ if ignore == "1":
+ return True
+ return False
+
+
+ def __create_ignore_list(self, usb_list):
+ count = 1
+ for _, dev in self.__dev_list.items():
+ if not self.__check_ignore_usb_list(usb_list, dev):
+ self.__dev_list_filtered[count] = dev
+ count += 1
+
+ def __getusblist(self):
+ count = 0
+ res = open('/sys/kernel/debug/usb/devices', 'rb').read().decode()
+ list = {}
+ for i in res.split('\n'):
+ if i.find('T:') != -1:
+ if len(list) > 0:
+ count += 1
+ self.__dev_list[count] = list
+ list = self.__parse_T(i)
+ elif i.find('D:') != -1:
+ list = {**list, **self.__parse_D(i)}
+ elif i.find('P:') != -1:
+ list = {**list, **self.__parse_P(i)}
+ elif i.find('S:') != -1:
+ list = {**list, **self.__parse_S(i)}
+ else:
+ """ Ignore all rest"""
+ pass
+
+ if count > 0:
+ count += 1
+ self.__dev_list[count] = list
+
+ self.__create_ignore_list(self.__xml_config.getUSBlist())
+
+
+ def __parse_T(self, str):
+ data = {}
+ res = scanf.scanf("T: Bus=%2c Lev=%2c Prnt=%2c Port=%2c Cnt=%2c Dev#=%3c Spd=%s MxCh=%2c", str)
+ data["BUS"] = res[0]
+ data["LEV"] = res[1]
+ data["PRNT"] = res[2]
+ data["PORT"] = res[3]
+ data["CNT"] = res[4]
+ data["DEV"] = res[5].lstrip()
+ data["SPEED"] = res[6].rstrip()
+ data["MXCH"] = res[7].lstrip()
+ return data
+
+ def __parse_D(self, str):
+ data = {}
+ str += ' '
+ res = scanf.scanf("D: Ver=%5c Cls=%2c(%s ) %s ", str)
+ data["VER"] = res[0].lstrip()
+ data["CLASS"] = res[1]
+ return data
+
+ def __parse_P(self, str):
+ data = {}
+ str += ' '
+ res = scanf.scanf("P: Vendor=%4c ProdID=%4c %s", str)
+ data["VENDOR"] = res[0].lstrip()
+ data["PRODID"] = res[1]
+ return data
+
+ def __parse_S(self, str):
+ data = {}
+ if str.find('Manufacturer') != -1:
+ pos = str.find('Manufacturer=')
+ data["MANUFACTURER"] = str[pos + len('Manufacturer='):].rstrip()
+ elif str.find('Product') != -1:
+ pos = str.find('Product=')
+ data["PRODUCT"] = str[pos + len('Product='):].rstrip()
+ elif str.find('SerialNumber') != -1:
+ pos = str.find('SerialNumber=')
+ data["SERIAL"] = str[pos + len('SerialNumber='):].rstrip()
+ else:
+ pass
+ return data
+
+ def finddevicebytag(self, tag, str):
+ t = {}
+ count = 0
+ for i, k in self.__dev_list.items():
+ if tag in k:
+ if k[tag].find(str) != -1:
+ t[count] = k
+ count += 1
+ return t
+
+ def __getSerialDevices(self):
+ self.__serial_converter = {}
+ self.__usb_disk = {}
+ self.__usb_amper = {}
+ test_myPath = os.path.dirname(os.path.abspath(__file__))
+ test_myPath = test_myPath + "/../"
+ s = SysCommand('usb', '{}/scripts/usb.sh'.format(test_myPath))
+ s.execute()
+ data_list = s.getOutput().decode()
+ for i in data_list.split('\n'):
+ if len(i) > 0:
+ pos_div = i.find('-')
+ dev = i[0:pos_div - 1]
+ devName = i[pos_div + 2:]
+ # print("dev: {}".format(dev))
+ # print("devName: {}".format(devName))
+ res = scanf.scanf("/dev/sd%c", dev)
+ if res is not None:
+ dev_id = res[0][0]
+ # Now verify if is Disk or Partition
+ res = scanf.scanf("/dev/sd%c%c", dev)
+ if res is not None:
+ # this is a partition and we can ignore it
+ pass
+ else:
+ self.__usb_disk[dev_id] = {dev: devName, 'udev': self.__getudevquery(dev)}
+ else:
+ res = scanf.scanf("/dev/ttyUSB%c", dev)
+ if res is not None:
+ serial_id = res[0][0]
+ self.__serial_converter[serial_id] = {dev: devName, 'udev': self.__getudevquery(dev)}
+
+ def __getudevquery(self, device):
+ list = {}
+ s = SysCommand('query-udev', 'udevadm info --query=all -n {}'.format(device))
+ res = s.execute()
+ if res != 0:
+ return None
+ for i in s.getOutput().decode().split('\n'):
+ if i.find('P:') != -1:
+ list = self.__parse_udev_P(i)
+ elif i.find('S:') != -1:
+ list = {**list, **self.__parse_udev_S(i)}
+ elif i.find('N:') != -1:
+ list = {**list, **self.__parse_udev_N(i)}
+ elif i.find('S:') != -1:
+ list = {**list, **self.__parse_udev_S(i)}
+ elif i.find('E:') != -1:
+ list = {**list, **self.__parse_udev_E(i)}
+ return list
+
+ def __parse_udev_P(self, str):
+ pos = str.find('P: ')
+ str = str[len('P: ') + pos:]
+ return {'PATH': str}
+
+ def __parse_udev_S(self, str):
+ pos = str.find('P: ')
+ str = str[len('P: ') + pos:]
+ pos = str.find('disk/by-id/')
+ if pos != -1:
+ return {'DISK_BY_ID': str[len('disk/by-id/') + pos:]}
+ pos = str.find('disk/by-path/')
+ if pos != -1:
+ return {'DISK_BY_PATH': str[len('disk/by-path/') + pos:]}
+ pos = str.find('disk/by-uuid/')
+ if pos != -1:
+ return {'DISK_BY_UUID': str[len('disk/by-uuid/') + pos:]}
+ else:
+ return {}
+
+ def __parse_udev_N(self, str):
+ pos = str.find('N: ')
+ str = str[len('N: ') + pos:]
+ return {'DEV_NAME': str}
+
+ def __parse_udev_E(self, str):
+ pos = str.find('E: ')
+ str = str[len('E: ') + pos:]
+ pos = str.find('=')
+ str_key = str[:pos]
+ str_value = str[pos + 1:]
+ return {str_key: str_value}
diff --git a/test-cli/test/helpers/utils.py b/test-cli/test/helpers/utils.py
new file mode 100644
index 0000000..8d2c27b
--- /dev/null
+++ b/test-cli/test/helpers/utils.py
@@ -0,0 +1,68 @@
+import json
+import socket
+import os
+import scanf
+
+def save_json_to_disk(filePath, description, mime, json_data, result):
+ with open(filePath, 'w') as outfile:
+ json.dump(json_data, outfile, indent=4)
+ outfile.close()
+ result.append(
+ {
+ "description": description,
+ "filepath": filePath,
+ "mimetype": mime
+ }
+ )
+
+def save_file_to_disk(filePath, description, mime, data, result):
+ with open(filePath, 'w') as outfile:
+ outfile.write(data)
+ outfile.close()
+ result.append(
+ {
+ "description": description,
+ "filepath": filePath,
+ "mimetype": mime
+ }
+ )
+
+def save_result (filePath, description, mime, result):
+ result.append(
+ {
+ "description": description,
+ "filepath": filePath,
+ "mimetype": mime
+ }
+ )
+
+
+def str2bool(v):
+ return v.lower() in ("yes", "true", "t", "1")
+
+def station2Port(base):
+ Name = socket.gethostname()
+ res = scanf.scanf("Station%d", Name)
+ if res[0]:
+ NmBase = int(base)
+ res = int(res[0])
+ return str(NmBase + res)
+ return base
+
+
+def sys_read(sysnode):
+ try:
+ f = open(sysnode, "r")
+ data = f.readline()
+ f.close()
+ except IOError as Error:
+ return False, '{}'.format(Error.errno)
+ return True, data
+
+def removefileIfExist(fname):
+ if os.path.exists(fname):
+ try:
+ os.remove(fname)
+ except OSError as error:
+ return False, str(error)
+ return True, ''