diff options
author | Manel Caro <mcaro@iatec.biz> | 2021-11-06 16:28:38 +0100 |
---|---|---|
committer | Manel Caro <mcaro@iatec.biz> | 2021-11-06 16:28:38 +0100 |
commit | cf19bfe18cbd283b188a858ee1629f9909c924f4 (patch) | |
tree | 1efb23519727130058401df090ab1b5f4cc8ba99 /test-cli/test/helpers | |
parent | b6932fbaf898724ae87c29f8965621610f377084 (diff) | |
parent | d5b273a3b58a250742049df4ca0ef0ba54f53d33 (diff) | |
download | board-cf19bfe18cbd283b188a858ee1629f9909c924f4.zip board-cf19bfe18cbd283b188a858ee1629f9909c924f4.tar.gz board-cf19bfe18cbd283b188a858ee1629f9909c924f4.tar.bz2 |
Diffstat (limited to 'test-cli/test/helpers')
27 files changed, 1676 insertions, 349 deletions
diff --git a/test-cli/test/helpers/DiskHelpers.py b/test-cli/test/helpers/DiskHelpers.py new file mode 100644 index 0000000..79b661e --- /dev/null +++ b/test-cli/test/helpers/DiskHelpers.py @@ -0,0 +1,9 @@ +import stat +import os + + +def disk_exists(path): + try: + return stat.S_ISBLK(os.stat(path).st_mode) + except: + return False diff --git a/test-cli/test/helpers/__init__.pyc b/test-cli/test/helpers/__init__.pyc Binary files differindex 7d1c907..01014e2 100644 --- a/test-cli/test/helpers/__init__.pyc +++ b/test-cli/test/helpers/__init__.pyc diff --git a/test-cli/test/helpers/amper.py b/test-cli/test/helpers/amper.py new file mode 100644 index 0000000..ea719f6 --- /dev/null +++ b/test-cli/test/helpers/amper.py @@ -0,0 +1,154 @@ +import serial +import scanf + + +class Amper(object): + __ser = None + __port = '/dev/ttyUSB0' + __speed = 115200 + __parity = None + __rtscts = 0 + __timeout = 1 + __isThere = False + __version = None + __voltage = 0.0 + __current = 0.0 + __alarm_condition = 0 + + def __init__(self, port='/dev/ttyUSB0', serial_speed=115200, parity=serial.PARITY_NONE, rtscts=0): + self.__port = port + self.__speed = serial_speed + self.__parity = parity + self.__rtscts = rtscts + + def open(self): + try: + if self.__ser is not None: + self.close() + self.__ser = serial.Serial(port=self.__port, + baudrate=self.__speed, + timeout=self.__timeout, + parity=self.__parity, + rtscts=self.__rtscts, + bytesize=serial.EIGHTBITS, + stopbits=serial.STOPBITS_ONE, + xonxoff=0) + + self.__ser.flushInput() + self.__ser.flushOutput() + self.__ser.break_condition = False + return True + except serial.SerialException as err: + print(err) + return False + + def close(self): + if self.__ser is not None: + self.__ser.close() + self.__ser = None + self.__isThere = False + self.__version = None + self.__voltage = 0.0 + self.__current = 0.0 + self.__alarm_condition = 0 + + def __write(self, data): + data += "\n\r" + dat = data.encode('ascii') + self.__ser.write(dat) + + def __read(self): + return self.__ser.readline().decode().rstrip() + + def __set_voltage_underlimit(self, under_limit): + if self.__isThere and self.__ser is not None: + self.__write('AT+VIN_UV_W_LIM={}'.format(under_limit)) + dat = self.__read() + print(dat) + + def __get_voltage_underlimit(self): + self.__write('AT+VIN_UV_W_LIM=?') + dat = self.__read() + res = scanf.scanf("%f", dat) + return res[0] + + def __set_voltage_overlimit(self, over_limit): + if self.__isThere and self.__ser is not None: + self.__write('AT+VIN_OV_W_LIM={}'.format(over_limit)) + dat = self.__read() + print(dat) + + def __get_voltage_overlimit(self): + if self.__isThere and self.__ser is not None: + self.__write('AT+VIN_OV_W_LIM?') + dat = self.__read() + res = scanf.scanf("%f", dat) + return res[0] + + def __set_current_overlimit(self, over_limit): + if self.__isThere and self.__ser is not None: + self.__write('AT+IOUT_W_LIM={}'.format(over_limit)) + dat = self.__read() + print(dat) + + def __get_current_overlimit(self): + if self.__isThere and self.__ser is not None: + self.__write('AT+IOUT_W_LIM?') + dat = self.__read() + dat = self.__read() + res = scanf.scanf("%f", dat) + return res[0] + + def hello(self): + self.__isThere = False + if self.__ser is None: + return False + self.__write('at+ver?') + dat = self.__read() + res = scanf.scanf("ISEE Amper v%3c", dat) + if res is None: + return False + self.__isThere = True + self.__version = res[0] + return True + + def getVersion(self): + return self.__version + + def getVoltage(self): + if self.__isThere and self.__ser is not None: + self.__write('at+vin?') + dat = self.__read() + res = scanf.scanf("%f", dat) + self.__voltage = res[0] + # print(self.__voltage) + return self.__voltage + else: + return None + + def getCurrent(self): + if self.__isThere and self.__ser is not None: + self.__write('at+in?') + dat = self.__read() + res = scanf.scanf("%f", dat) + self.__current = res[0] + # print(self.__current) + else: + return None + return self.__current + + def getAlarm(self): + if self.__isThere and self.__ser is not None: + self.__write('at+st_mfr?') + dat = self.__read() + # print(dat) + self.__alarm_condition = (int(dat) & 0x0F) + # print(self.__alarm_condition) + return self.__alarm_condition + + def clearAlarm(self): + if self.__isThere and self.__ser is not None: + self.__write('at+CLRFAULT') + dat = self.__read() + # print(dat) + diff --git a/test-cli/test/helpers/button_script.sh b/test-cli/test/helpers/button_script.sh deleted file mode 100644 index 6908f22..0000000 --- a/test-cli/test/helpers/button_script.sh +++ /dev/null @@ -1,4 +0,0 @@ -#!/usr/bin/env bash -i2cset -f -y 1 0x2d 0x40 0x31 -i2cset -f -y 1 0x2d 0x50 0xff -i2cget -f -y 1 0x2d 0x50 diff --git a/test-cli/test/helpers/camara.py b/test-cli/test/helpers/camara.py new file mode 100644 index 0000000..b23df74 --- /dev/null +++ b/test-cli/test/helpers/camara.py @@ -0,0 +1,116 @@ +import cv2 +import sh + +class Camara(object): + __device_name = None + __device = None + __setupScriptPath = '' + __w = 1280 + __h = 720 + __contrast = 0.0 + __brightness = 0.0 + __saturation = 55.0 + __hue = 0.0 + __exposure = 166 + + def __init__(self, setup_script_path, device="video0", width=1280, height=720): + self.__device_name = device + self.__w = width + self.__h = height + self.__setupScriptPath = setup_script_path + + def Close(self): + if self.__device is not None: + del self.__device + self.__device = None + + def Open(self): + self.Close() + self.__device = cv2.VideoCapture("/dev/{}".format(self.__device_name)) + if self.__device.isOpened(): + self.__configure() + return True + return False + + def getSize(self): + return self.__w, self.__h + + def setSize(self, w, h): + if self.__device is not None and self.__device.isOpened(): + self.__w = self.__setCamVar(cv2.CAP_PROP_FRAME_WIDTH, w) + self.__h = self.__setCamVar(cv2.CAP_PROP_FRAME_HEIGHT, h) + else: + self.__w = w + self.__h = h + + def setContrast(self, newVal): + if self.__device.isOpened(): + self.__contrast = self.__setCamVar(cv2.CAP_PROP_CONTRAST, newVal) + else: + self.__contrast = newVal + + def getContrast(self): + return self.__contrast + + def setBrightness(self, newVal): + if self.__device.isOpened(): + self.__brightness = self.__setCamVar(cv2.CAP_PROP_BRIGHTNESS, newVal) + else: + self.__brightness = newVal + + def getBrightness(self): + return self.__brightness + + def __configure(self): + self.__w = self.__setCamVar(cv2.CAP_PROP_FRAME_WIDTH, self.__w) + self.__h = self.__setCamVar(cv2.CAP_PROP_FRAME_HEIGHT, self.__h) + sh.bash(self.__setupScriptPath) + + def __setCamVar(self, key, val): + valold = cv2.VideoCapture.get(self.__device, key) + if valold != val: + cv2.VideoCapture.set(self.__device, key, val) + t = cv2.VideoCapture.get(self.__device, key) + return t + return val + + def __getCamVar(self, key): + return cv2.VideoCapture.get(self.__device, key); + + def getFrameCount(self): + return cv2.VideoCapture.get(self.__device, cv2.CAP_PROP_BUFFERSIZE); + + def getFrame(self): + if self.__device.isOpened(): + retval, image = self.__device.read() + if retval: + return image + return None + else: + return None + + def getImageSize(self, image): + if hasattr(image, 'shape'): + return image.shape[1], image.shape[0] + else: + return (0, 0, 0) + + def showFrame(self, name, frame, w=False, maxTime=3000): + cv2.imshow(name, frame) + if w: + if maxTime == -1: + cv2.waitKey() + else: + cv2.waitKey(maxTime) + + def saveFrame(self, fileName, Frame): + cv2.imwrite(fileName, Frame) + + def readImage(self, filename): + return cv2.imread('{}'.format(filename), 0) + + def destroyWindow(self, name): + cv2.destroyWindow(name) + + def closeWindows(self): + cv2.destroyAllWindows()
\ No newline at end of file diff --git a/test-cli/test/helpers/changedir.py b/test-cli/test/helpers/changedir.py new file mode 100644 index 0000000..fad9ade --- /dev/null +++ b/test-cli/test/helpers/changedir.py @@ -0,0 +1,14 @@ +import os + + +class changedir: + """Context manager for changing the current working directory""" + def __init__(self, newPath): + self.newPath = os.path.expanduser(newPath) + + def __enter__(self): + self.savedPath = os.getcwd() + os.chdir(self.newPath) + + def __exit__(self, etype, value, traceback): + os.chdir(self.savedPath)
\ No newline at end of file diff --git a/test-cli/test/helpers/cmdline.py b/test-cli/test/helpers/cmdline.py new file mode 100644 index 0000000..db94a1c --- /dev/null +++ b/test-cli/test/helpers/cmdline.py @@ -0,0 +1,28 @@ + +class LinuxKernelCmd: + + __kernel_vars = {} + + def __init__(self): + self.parse_kernel_cmdline() + + def parse_kernel_cmdline(self): + cmdline = open('/proc/cmdline', 'rb').read().decode() + cmd_array = cmdline.split() + i = 0 + for f in cmdline.split(): + pos = f.find('=') + if pos == -1: + cmd_array[i - 1] = cmd_array[i - 1] + " " + f + cmd_array.remove(cmd_array[i]) + else: + i += 1 + self.__kernel_vars = {} + for f in cmd_array: + dat = f.split('=') + self.__kernel_vars[dat[0]] = dat[1] + + def getkvar(self, vName, vdefault): + if vName in self.__kernel_vars: + return self.__kernel_vars[vName] + return vdefault diff --git a/test-cli/test/helpers/cv_display_test.py b/test-cli/test/helpers/cv_display_test.py index b54a698..3a3004f 100644 --- a/test-cli/test/helpers/cv_display_test.py +++ b/test-cli/test/helpers/cv_display_test.py @@ -14,8 +14,7 @@ def pattern_detect(cam_device=0): # RETURN 0 only if the test is ok msg="0" # Capture the corresponding camera device [0,1] - #capture = cv2.VideoCapture(0) - capture = cv2.VideoCapture("/dev/v4l/by-id/usb-Creative_Technology_Ltd._Live__Cam_Sync_HD_VF0770-video-index0") + capture = cv2.VideoCapture(0) try: _, image = capture.read() except: @@ -64,12 +63,10 @@ def pattern_detect(cam_device=0): # After testing the gamma factor correction is computed with the green color # gamma = gamma_factor / blue+red into green part # gamma factor = 50-100 - #gamma = (100 / (avg_color_rawg[0] + avg_color_rawg[2] + 1)) - gamma=1 + gamma = (100 / (avg_color_rawg[0] + avg_color_rawg[2] + 1)) # Adjust the image acording to this gamma value adjusted = adjust_gamma(image, gamma=gamma) -# adjusted=image - cv2.imwrite( "/home/root/result_hdmi_img.jpg", adjusted); + # Calculate again the average color using the gamma adjusted image # Crop the gamma adjusted image for wach color section red_cal = adjusted[y1:y2, xr1:xr2] @@ -109,19 +106,14 @@ def pattern_detect(cam_device=0): mask_r = mask0 + mask1 # mask_r = cv2.inRange(hsv, lower_red, upper_red) # Perform a morphological open to expand -# kernel = np.ones((5, 5), np.uint8) -# closing_r = cv2.morphologyEx(mask_r, cv2.MORPH_OPEN, kernel) -# closing_b = cv2.morphologyEx(mask_b, cv2.MORPH_OPEN, kernel) -# closing_g = cv2.morphologyEx(mask_g, cv2.MORPH_OPEN, kernel) + kernel = np.ones((15, 15), np.uint8) + closing_r = cv2.morphologyEx(mask_r, cv2.MORPH_OPEN, kernel) + closing_b = cv2.morphologyEx(mask_b, cv2.MORPH_OPEN, kernel) + closing_g = cv2.morphologyEx(mask_g, cv2.MORPH_OPEN, kernel) # Count the number of pixels that are not of the corresponding color (black) -# count_r = cv2.countNonZero(closing_r) -# count_b = cv2.countNonZero(closing_b) -# count_g = cv2.countNonZero(closing_g) - #----------- - count_r = cv2.countNonZero(mask_r) - count_b = cv2.countNonZero(mask_b) - count_g = cv2.countNonZero(mask_g) - #------- + count_r = cv2.countNonZero(closing_r) + count_b = cv2.countNonZero(closing_b) + count_g = cv2.countNonZero(closing_g) if (count_r < 5): msg = "RED COUNT FAIL" return msg diff --git a/test-cli/test/helpers/detect.py b/test-cli/test/helpers/detect.py new file mode 100644 index 0000000..193dabf --- /dev/null +++ b/test-cli/test/helpers/detect.py @@ -0,0 +1,61 @@ +import cv2 +import numpy as np + + +class Detect_Color(object): + oFrame = None + img_hsv = None + __red_lower1 = [0, 50, 20] + __red_upper1 = [5, 255, 255] + __red_lower2 = [175, 50, 20] + __red_upper2 = [180, 255, 255] + __blue_lower = [110, 50, 50] + __blue_upper = [130, 255, 255] + __green_lower = [36, 25, 25] + __green_upper = [86, 255, 255] + + def __init__(self, frame): + self.oFrame = frame + self.__hist() + + def __hist(self): + self.img_hsv = cv2.cvtColor(self.oFrame, cv2.COLOR_BGR2HSV) + + def getRed(self): + return self.__detect_two_areas([0, 50, 20], [5, 255, 255], [175, 50, 20], [180, 255, 255]) + + def getBlue(self): + return self.__detect_one_area([110, 50, 50], [130, 255, 255]) + + def getGreen(self): + return self.__detect_one_area([36, 25, 25], [86, 255, 255]) + + def __detect_one_area(self, lower, upper): + a_lower = np.array(lower) + a_upper = np.array(upper) + c_mask = cv2.inRange(self.img_hsv, a_lower, a_upper) + croped = cv2.bitwise_and(self.oFrame, self.oFrame, mask=c_mask) + mean_v = cv2.mean(self.oFrame, mask=c_mask) + mean = {} + mean['R'] = mean_v[2] + mean['G'] = mean_v[1] + mean['B'] = mean_v[0] + count = cv2.countNonZero(c_mask) + return c_mask, croped, mean, count + + def __detect_two_areas(self, lower1, upper1, lower2, upper2): + a1_lower = np.array(lower1) + a1_upper = np.array(upper1) + a2_lower = np.array(lower2) + a2_upper = np.array(upper2) + c_mask1 = cv2.inRange(self.img_hsv, a1_lower, a1_upper) + c_mask2 = cv2.inRange(self.img_hsv, a2_lower, a2_upper) + c_mask = cv2.bitwise_or(c_mask1, c_mask2) + croped = cv2.bitwise_and(self.oFrame, self.oFrame, mask=c_mask) + mean_v = cv2.mean(self.oFrame, mask=c_mask) + mean = {} + mean['R'] = mean_v[2] + mean['G'] = mean_v[1] + mean['B'] = mean_v[0] + count = cv2.countNonZero(c_mask) + return c_mask, croped, mean, count diff --git a/test-cli/test/helpers/finisher.py b/test-cli/test/helpers/finisher.py deleted file mode 100644 index 73142d9..0000000 --- a/test-cli/test/helpers/finisher.py +++ /dev/null @@ -1,151 +0,0 @@ -from test.helpers.syscmd import SysCommand -from test.helpers.globalVariables import globalVar -import binascii -import uuid -import subprocess -from PIL import Image, ImageDraw, ImageFont -import qrcode -import PIL - -class Finisher(object): - __muhb = None - __final_to_burn_to_eeprom = None - __qr_image = None - - def __init__(self, muhb = None): - self.__muhb = muhb - self.__final_to_burn_to_eeprom = bytearray() - self.__qr_image = None - pass - - def eeprom_burn(self): - ## Create binary file - str_cmd2 = "find /sys/ -iname 'eeprom'" - eeprom_location = SysCommand("eeprom_location", str_cmd2) - if eeprom_location.execute() == 0: - raw_out = eeprom_location.getOutput() - if raw_out == "": - self.fail("Unable to get EEPROM location. IS EEPROM CONNECTED?") - eeprom = raw_out.decode('ascii') - eeprom = eeprom.strip('\n') - ## push binary data to eeprom like if working with files - file2 = open(eeprom, "w+b") - file2.write(self.__final_to_burn_to_eeprom) - else: - self.fail("failed: could not complete find eeprom command") - - def generate_qr_stamp(self): - # Generate QR to put in a stamp - qr = qrcode.QRCode( - version=1, - error_correction=qrcode.constants.ERROR_CORRECT_L, - box_size=10, - border=4, - ) - # Use board_uuid to generate the QR code - qr.add_data(globalVar.g_uuid) - qr.make(fit=True) - # Save QR as image stamp.png - img = qr.make_image() - # Store QR as a class atrib - self.__qr_image = img - img.save('/home/root/stamp.png') - - def print_stamp(self): - # Print stamp by sending a cmd to the printer - str_cmd3 = "lpr -o scaling=4 stamp.png".format(self.__display) - printstamp = SysCommand("printstamp", str_cmd3) - if printstamp.execute() != 0: - self.fail("failed: could not print stamp") - - def generate_screen(self): - # Generate green image with board uuid and QR maybe pasted on top of green image - # Define image size - W = 1250 - H = 703 - # Create blank rectangle to write on - image = Image.new('RGB', (W, H), (46, 204, 113, 0)) - draw = ImageDraw.Draw(image) - message = "TEST OK\n\n" + "Board UUID: "+ globalVar.g_uuid - font = ImageFont.truetype('/usr/share/fonts/truetype/dejavuu/DejaVuSans.ttf', 40) - # Calculate the width and height of the text to be drawn, given font size - w, h = draw.textsize(message, font=font) - # Write the text to the image, where (x,y) is the top left corner of the text - draw.text(((W-w)/2,(H-h)/2), message, align='center', font=font) - #draw.image(self.__qr_image) - image.save('/home/root/test/files/test_ok.png') - - def show_result_screen(self): - # If test OK show test_OK.png located in /home/root, If test fail show test_fail.png, located in /home/root/test/files/test_KO.png - if globalVar.fstatus: - str_cmd4 = "fbi -T 1 --noverbose -d /dev/{} test/files/test_ok.png".format('fb0') - else: - str_cmd4 = "fbi -T 1 --noverbose -d /dev/{} test/files/test_ko.png".format('fb0') - - display_image = SysCommand("display_image", str_cmd4) - #print(display_image.execute()) - if display_image.execute() != -1: - self.fail("failed: could not display the image") - - - def end_ok(self): - # Burn retrieved igep eeprom struct - new = self.__muhb[0][0] - - # Convert from string to hex - hnew = new.encode() - # Magic_id and default crc32 0x6d, 0x6a, 0x6d, 0xe4 with endianess changed so that u-boot loads it correctly - # IF magic ever changes this magic_id should be changed. At the end always magic_id of test and u-boot should - # be the same - magic_id = bytes([0xe4, 0x6d, 0x6a, 0x6d]) - default_igep_crc32 = bytes([0x00, 0x00, 0x00, 0x00]) - - # Create bytearray - to_calculate = bytearray() - # Build the final hex binary for crc32 operation - to_calculate.extend(magic_id) - to_calculate.extend(default_igep_crc32) - to_calculate.extend(hnew) - - # Calculate crc32! - new_crc32 = binascii.crc32(to_calculate) - hnew_crc32 = new_crc32.to_bytes(4, byteorder="little") - - # Recreate final eeprom struct in bytearray - self.__final_to_burn_to_eeprom = bytearray() - self.__final_to_burn_to_eeprom.extend(magic_id) - self.__final_to_burn_to_eeprom.extend(hnew_crc32) - self.__final_to_burn_to_eeprom.extend(hnew) - self.eeprom_burn() - - # Generate QR stamp - self.generate_qr_stamp() - # Send print stamp command - #self.print_stamp() - # Generate green image with board uuid and QR maybe pasted on top of green image - self.generate_screen() - # Show test_ok.png image in screen - self.show_result_screen() - - def end_fail(self): - # Burn igep eeprom bug struct - hnew = self.__muhb.encode() - default_fail = bytes([0xD0, 0xBA, 0xD0, 0xBA]) - self.__final_burn_to_eeprom = bytearray() - self.__final_to_burn_to_eeprom.extend(default_fail) - self.__final_to_burn_to_eeprom.extend(default_fail) - self.__final_to_burn_to_eeprom.extend(hnew) - self.__final_to_burn_to_eeprom.extend(default_fail) - self.__final_to_burn_to_eeprom.extend(default_fail) - self.__final_to_burn_to_eeprom.extend(default_fail) - self.__final_to_burn_to_eeprom.extend(default_fail) - self.__final_to_burn_to_eeprom.extend(default_fail) - self.__final_to_burn_to_eeprom.extend(default_fail) - self.__final_to_burn_to_eeprom.extend(default_fail) - self.__final_to_burn_to_eeprom.extend(default_fail) - self.__final_to_burn_to_eeprom.extend(default_fail) - self.__final_to_burn_to_eeprom.extend(default_fail) - self.__final_to_burn_to_eeprom.extend(default_fail) - self.eeprom_burn() - # Show test_ko.png image in screen - self.show_result_screen()
\ No newline at end of file diff --git a/test-cli/test/helpers/get_dieid.py b/test-cli/test/helpers/get_dieid.py deleted file mode 100644 index b20f143..0000000 --- a/test-cli/test/helpers/get_dieid.py +++ /dev/null @@ -1,43 +0,0 @@ -import mmap -import os -import struct -MAP_MASK = mmap.PAGESIZE - 1 -WORD = 4 -def read(addr): - """ Read from any location in memory - Returns the readed value in hexadecimal format - Keyword arguments: - - addr: The memory address to be readed. - """ - fd = os.open("/dev/mem", os.O_RDWR | os.O_SYNC) - # Map one page - mm = mmap.mmap(fd, mmap.PAGESIZE, mmap.MAP_SHARED, mmap.PROT_WRITE | mmap.PROT_READ, offset=addr & ~MAP_MASK) - mm.seek(addr & MAP_MASK) - retval = struct.unpack('I', mm.read(WORD)) - mm.close() - os.close(fd) - return "%08X" % retval[0] - -def getRegisters(model): - if model.find("IGEP0046") == 0: - registers = [0x021BC420, 0x021BC410] - elif model.find("IGEP0000") == 0: - registers = [0x021BC420, 0x021BC410] - elif model.find("IGEP0034") == 0 or model.find("SOPA0000") == 0: - registers = [0x44e10630, 0x44e10634, 0x44e10638, 0x44e1063C] - elif model.find("OMAP3") == 0: - registers = [0x4830A224, 0x4830A220, 0x4830A21C, 0x4830A218] - elif model.find("OMAP5") == 0: - registers = [0x4A002210, 0x4A00220C, 0x4A002208, 0x4A002200] - return registers - -def genDieid(modelid): - registers=getRegisters(modelid) - id="" - for i in range(len(registers)): - id=id+(read(registers[i])) - return id - -#if __name__ == "__main__": - #registers = [0x021BC420, 0x021BC410] - #print(main(registers)) diff --git a/test-cli/test/helpers/globalVariables.py b/test-cli/test/helpers/globalVariables.py index c4d8358..baaae57 100644 --- a/test-cli/test/helpers/globalVariables.py +++ b/test-cli/test/helpers/globalVariables.py @@ -6,5 +6,4 @@ def globalVar(): g_mid = "" outdata = "NONE" station = "" - fstatus = "" - gdisplay = None
\ No newline at end of file + taskid_ctl = "" diff --git a/test-cli/test/helpers/gpio.py b/test-cli/test/helpers/gpio.py new file mode 100644 index 0000000..f127f2b --- /dev/null +++ b/test-cli/test/helpers/gpio.py @@ -0,0 +1,55 @@ +import os + +class gpio (object): + gpioNum = None + + def __init__(self, gpioNum, dir, val): + self.gpioNum = gpioNum + self.__export(gpioNum) + self.__set_dir(gpioNum, dir) + if dir == 'out': + self.__set_value(gpioNum, val) + + def set_val(self, value): + self.__set_value( self.gpioNum, value) + + + def __export(self, gpio_number): + if not os.path.isfile("/sys/class/gpio/gpio{}/value".format(gpio_number)): + try: + f = open("/sys/class/gpio/export", "w", newline="\n") + f.write(str(gpio_number)) + f.close() + except IOError: + return False, '{}'.format(IOError.errno) + return True + + def __unexport(self, gpio_number): + if os.path.isfile("/sys/class/gpio/gpio{}/value".format(gpio_number)): + try: + f = open("/sys/class/gpio/unexport", "w", newline="\n") + f.write(str(gpio_number)) + f.close() + except IOError: + return False, '{}'.format(IOError.errno) + return True + + def __set_dir(self, gpio_number, dir): + try: + f = open("/sys/class/gpio/gpio{}/direction".format(gpio_number), "r+", newline="\n") + val = f.readline() + if val != dir: + f.write(dir) + f.close() + except IOError: + return False, '{}'.format(IOError.errno) + return True + + def __set_value(self, gpio_number, value): + try: + f = open("/sys/class/gpio/gpio{}/value".format(gpio_number), "w", newline="\n") + f.write(str(value)) + f.close() + except IOError: + return False, '{}'.format(IOError.errno) + return True
\ No newline at end of file diff --git a/test-cli/test/helpers/int_registers.py b/test-cli/test/helpers/int_registers.py new file mode 100644 index 0000000..133387c --- /dev/null +++ b/test-cli/test/helpers/int_registers.py @@ -0,0 +1,68 @@ +import mmap +import os +import struct +import sh + +MAP_MASK = mmap.PAGESIZE - 1 +WORD = 4 + + +def read(addr): + """ Read from any location in memory + Returns the readed value in hexadecimal format + Keyword arguments: + - addr: The memory address to be readed. + """ + fd = os.open("/dev/mem", os.O_RDWR | os.O_SYNC) + # Map one page + mm = mmap.mmap(fd, mmap.PAGESIZE, mmap.MAP_SHARED, mmap.PROT_WRITE | mmap.PROT_READ, offset=addr & ~MAP_MASK) + mm.seek(addr & MAP_MASK) + retval = struct.unpack('I', mm.read(WORD)) + mm.close() + os.close(fd) + return "%08X" % retval[0] + +def imx8m_readid(): + f = open("/sys/bus/soc/devices/soc0/soc_uid", "r", newline="\n") + val = f.readline() + f.close() + return val.rstrip() + + +def get_die_id(modelid): + dieid = "" + + # get registers + if modelid.find("IGEP0046") == 0: + registers = [0x021BC420, 0x021BC410] + elif modelid.find("IGEP0034") == 0 or modelid.find("SOPA0000") == 0: + # registers: mac_id0_lo, mac_id0_hi, mac_id1_lo, mac_id1_hi + registers = [0x44e10630, 0x44e10634, 0x44e10638, 0x44e1063C] + elif modelid.find("OMAP3") == 0 or modelid.find("IGEP0020") == 0 or modelid.find("IGEP0030") == 0: + registers = [0x4830A224, 0x4830A220, 0x4830A21C, 0x4830A218] + elif modelid.find("OMAP5") == 0: + registers = [0x4A002210, 0x4A00220C, 0x4A002208, 0x4A002200] + elif modelid.find("IGEP0048") == 0: + return imx8m_readid() + else: + raise Exception('modelid not defined: {}, modelid') + for rg in registers: + dieid = dieid + read(rg) + return dieid + + +def get_internal_mac(modelid): + mac = None + + if modelid.find("IGEP0034") == 0 or modelid.find("SOPA0000") == 0: + # # registers: mac_id0_lo, mac_id0_hi + # registers = [0x44e10630, 0x44e10634] + # mac = "" + # for rg in registers: + # mac = mac + read(rg) + # #erase trailing zeros + # mac = mac[4::1] + # # To be finished... + mac = sh.cat("/sys/class/net/eth0/address").strip() + + return mac diff --git a/test-cli/test/helpers/iseelogger.py b/test-cli/test/helpers/iseelogger.py new file mode 100644 index 0000000..4b1087c --- /dev/null +++ b/test-cli/test/helpers/iseelogger.py @@ -0,0 +1,62 @@ +import logging +import logging.handlers +import datetime + +class ISEE_Logger(object): + __logger = None + __logHandler = None + __formater = None + __logHandlerConsole = None + + def __init__(self, level=logging.INFO): + # Create syslog logger + self.__logger = logging.getLogger('ISEE_logger') + self.__logger.setLevel(level) + self.__logHandler = logging.handlers.SysLogHandler('/dev/log') + self.__logHandlerConsole = logging.StreamHandler() + self.__formater = logging.Formatter('Python: { "loggerName":"%(name)s", "timestamp":"%(asctime)s", "pathName":"%(pathname)s", "logRecordCreationTime":"%(created)f", "functionName":"%(funcName)s", "levelNo":"%(levelno)s", "lineNo":"%(lineno)d", "time":"%(msecs)d", "levelName":"%(levelname)s", "message":"%(message)s"}') + self.__logHandler.formatter = self.__formater + self.__logHandlerConsole.formatter = self.__formater + self.__logger.addHandler(self.__logHandler) + self.__logger.addHandler(self.__logHandlerConsole) + + def setLogLevel(self, level): + if level.upper() == "DEBUG": + nlevel = logging.DEBUG + elif level.upper() == "INFO": + nlevel = logging.INFO + elif level.upper() == "ERROR": + nlevel = logging.ERROR + elif level.upper() == "WARNING": + nlevel = logging.WARNING + else: + nlevel = logging.DEBUG + + self.__logger.setLevel(nlevel) + + def getlogger(self): + return self.__logger + +class MeasureTime: + __difference = None + __first_time = None + __later_time = None + + def __init__(self): + self.__first_time = datetime.datetime.now() + + def start(self): + self.first_time = datetime.datetime.now() + + def stop(self): + self.__later_time = datetime.datetime.now() + self.__difference = self.__later_time - self.__first_time + return self.__difference.total_seconds() + + def getTime(self): + return self.__difference.total_seconds() + + +global logObj +logObj = ISEE_Logger(logging.INFO) + diff --git a/test-cli/test/helpers/iw.py b/test-cli/test/helpers/iw.py new file mode 100644 index 0000000..8e29448 --- /dev/null +++ b/test-cli/test/helpers/iw.py @@ -0,0 +1,124 @@ +from sh import iw +from scanf import scanf + +class iw_scan: + __interface = None + __raw_unparsed = [] + __raw = None + + def __init__(self, interface='wlan0'): + self.__interface = interface + + def scan(self): + self.__raw_unparsed = [] + self.__raw = iw('{}'.format(self.__interface), 'scan') + if self.__raw.exit_code == 0: + self.__raw_unparsed = self.__raw.stdout.decode("utf-8").split('\n') + return True + return False + + def getRawData (self): + return self.__raw_unparsed + + def getInterface(self): + return self.__interface + + def getRawObject(self): + return self.__raw + + +class iwScan: + __iw_scan = None + __station_list = {} + + def __init__(self, interface='wlan0'): + self.__iw_scan = iw_scan(interface) + + def scan(self): + if self.__iw_scan.scan(): + self.__parse_scan(self.__iw_scan.getRawData()) + return True + return False + + def getLastError(self): + rObject = self.__iw_scan.getRawObject() + if rObject.exit_code != 0: + return rObject.stderr + return '' + + def __Parse_BBS(self, line): + data = {} + res = scanf("BSS %s(on %s) -- %s", line) + if res == None: + res = scanf("BSS %s(on %s)", line) + data["MAC"] = res[0] + data["DEV"] = res[1] + if len(res) == 3: + data["ASSOCIATED"] = 'YES' + else: + data["ASSOCIATED"] = 'NO' + return data + + def __Parse_T(self, id, OnNotFoundAttr ,line): + data = {} + res = scanf("{}: %s".format(id), line) + if res == None: + data[id.upper()] = OnNotFoundAttr + else: + data[id.upper()] = res[0] + return data + + def __Parse_X(self, line): + data = {} + res = scanf("DS Parameter set: channel %s", line) + if res == None: + data['CHANNEL'] = '-1' + else: + data['CHANNEL'] = res[0] + return data + + def __block_stationlist(self, rawdata): + list = {} + count = 0 + for line in rawdata: + idx = line.find('BSS') + if idx != -1 and idx == 0: + if len(list) > 0: + count += 1 + self.__station_list[count] = list + list = self.__Parse_BBS(line) + elif line.find('SSID:') != -1: + list = {**list, **self.__Parse_T('SSID', 'HIDDEN', line)} + elif line.find('freq:') != -1: + list = {**list, **self.__Parse_T('freq', '', line)} + elif line.find('signal:') != -1: + list = {**list, **self.__Parse_T('signal', '', line)} + elif line.find('DS Parameter set:') != -1: + list = {**list, **self.__Parse_X(line)} + if count > 0: + count += 1 + self.__station_list[count] = list + + def __parse_scan(self, rawData): + self.__station_list = {} + self.__block_stationlist(rawData) + #print('{}'.format(self.__station_list)) + #for i in self.__station_list: + # print(self.__station_list[i]) + + def findConnected (self): + for i in self.__station_list: + st = self.__station_list[i] + if st.get('ASSOCIATED', 'NO') == 'YES': + return True, self.__station_list[i] + return False, None + + def findBySSID(self, ssid): + for i in self.__station_list: + st = self.__station_list[i] + if st.get('SSID', '') == ssid: + return True, self.__station_list[i] + return False, None + + def getStations(self): + return self.__station_list
\ No newline at end of file diff --git a/test-cli/test/helpers/md5.py b/test-cli/test/helpers/md5.py new file mode 100644 index 0000000..9bc9e23 --- /dev/null +++ b/test-cli/test/helpers/md5.py @@ -0,0 +1,8 @@ +import hashlib + +def md5_file(fname): + hash_md5 = hashlib.md5() + with open(fname, "rb") as f: + for chunk in iter(lambda: f.read(4096), b""): + hash_md5.update(chunk) + return hash_md5.hexdigest()
\ No newline at end of file diff --git a/test-cli/test/helpers/plc.py b/test-cli/test/helpers/plc.py new file mode 100644 index 0000000..069058d --- /dev/null +++ b/test-cli/test/helpers/plc.py @@ -0,0 +1,82 @@ +import sh +from sh import flashcp +from sh import flash_eraseall +from sh import ErrorReturnCode +from sh import Command +from test.helpers.gpio import gpio +import time + + +class dcpPLC(object): + __nReset = None + __Phy = None + __Plc = None + __mtd_device = None + __factool = None + __myConfigTool = None + + def __init__(self, plcfactool, mtd_device): + # default save firmware + self.__nRest = gpio('75', 'out', '0') + self.__nRest = gpio('69', 'out', '0') + self.__nRest = gpio('78', 'out', '1') + self.__factool = plcfactool + self.__mtd_device = mtd_device + self.__myConfigTool = Command(self.__factool) + + def setSaveFirmwareMode(self): + self.__nRest = gpio('75', 'out', '0') + self.__nRest = gpio('69', 'out', '0') + self.__nRest = gpio('78', 'out', '1') + + def setBootMode(self): + self.__nRest = gpio('78', 'out', '0') + self.__nRest = gpio('75', 'out', '1') + self.__nRest = gpio('69', 'out', '1') + + def setPLCReset(self): + self.__nRest = gpio('75', 'out', '0') + self.__nRest = gpio('69', 'out', '0') + time.sleep(1) + self.__nRest = gpio('69', 'out', '1') + self.__nRest = gpio('75', 'out', '1') + + def SaveFirmware(self, firmare): + self.setSaveFirmwareMode() + try: + flash_eraseall(self.__mtd_device) + flashcp(firmare, self.__mtd_device) + except ErrorReturnCode as Error: + return False, "plc flash firmware failed {} ".format(Error.exit_code) + return True, '' + + def set_plc(self, var, value, password): + try: + res = self.__myConfigTool("-o", "SET", "-p", "{}={}".format(var, value), "-w", "{}".format(password)) + print(res) + except ErrorReturnCode as Error: + return False, "set var failed {} {}".format(var, Error.exit_code) + return True, '' + + def set_plc2(self, var1, value1, var2, value2, password): + try: + res = self.__myConfigTool("-o", "SET", "-p", "{}={}".format(var1, value1), "-p", + "{}={}".format(var2, value2), "-w", "{}".format(password)) + print(res) + except ErrorReturnCode as Error: + return False, "set var failed {}".format(Error.exit_code) + return True, '' + + def get_plc(self, var, value, password): + try: + self.__myConfigTool("-o", "GET", "-p", "{}".format(var), '{}'.format(value), "-w", "{}".format(password)) + except ErrorReturnCode as Error: + return False, "set var failed {} {}".format(var, Error.exit_code) + return True, '' + + def discover(self): + try: + self.__myConfigTool("-o", "DISCOVER") + except ErrorReturnCode: + return False + return True diff --git a/test-cli/test/helpers/psqldb.py b/test-cli/test/helpers/psqldb.py index 26dd03d..0141414 100644 --- a/test-cli/test/helpers/psqldb.py +++ b/test-cli/test/helpers/psqldb.py @@ -1,34 +1,40 @@ import psycopg2 +import psycopg2.extras + class PgSQLConnection(object): - """aaaaaaa""" + """Postgres Connection Object""" __conection_object = None __db_config = {'dbname': 'testsrv', 'host': '192.168.2.171', - 'password': 'Idkfa2009', 'port': 5432, 'user': 'admin'} + 'password': 'Idkfa2009', 'port': 5432, 'user': 'admin'} - def __init__ (self): -# self.__conection_object = None -# if connect_str is not None: -# self.__db_config = connect_str -# else: - self.__db_config = {'dbname': 'testsrv', 'host': '192.168.2.171', - 'password': 'Idkfa2009', 'port': 5432, 'user': 'admin'} + def __init__(self, connect_str=None): + self.__conection_object = None + if connect_str is not None: + self.__db_config = connect_str + else: + self.__db_config = {'dbname': 'testsrv', 'host': '192.168.2.171', + 'password': 'Idkfa2009', 'port': 5432, 'user': 'admin'} - def db_connect (self, connect_str): + def db_connect(self, connect_str=None): result = False try: if connect_str == None: self.__conection_object = psycopg2.connect(**self.__db_config) else: - self.__db_config = connect_str; + self.__db_config = connect_str self.__conection_object = psycopg2.connect(**self.__db_config) + self.__conection_object.autocommit = True result = True except Exception as error: print(error) return result + def getConfig(self): + return self.__db_config + def db_execute_query(self, query): cur = self.__conection_object.cursor() cur.execute(query) @@ -36,20 +42,26 @@ class PgSQLConnection(object): cur.close() return data + def db_upload_large_file(self, filepath): + # a new connection must be created to use large objects + __conn = psycopg2.connect(**self.__db_config) + lobj = __conn.lobject(oid=0, mode="rw", new_oid=0, new_file=filepath) + fileoid = lobj.oid + lobj.close() + __conn.commit() + __conn.close() + return fileoid + def commit(self): self.__conection_object.commit() - def db_close(self): if self.__conection_object is not None: self.__conection_object.close() del self.__conection_object self.__conection_object = None - def __del__(self): if self.__conection_object is not None: self.__conection_object.close() - #print("disconnected") - - + # print("disconnected") diff --git a/test-cli/test/helpers/qrreader.py b/test-cli/test/helpers/qrreader.py new file mode 100644 index 0000000..51e5247 --- /dev/null +++ b/test-cli/test/helpers/qrreader.py @@ -0,0 +1,123 @@ +import evdev +from evdev import InputDevice, categorize, ecodes +import threading +import time +import selectors +from selectors import DefaultSelector, EVENT_READ +from test.helpers.iseelogger import logObj + +selector = selectors.DefaultSelector() + +qrdevice_list = [ + "Honeywell Imaging & Mobility 1900", + "Manufacturer Barcode Reader", + "SM SM-2D PRODUCT HID KBW", + "Honeywell Imaging & Mobility 1400g" +] + +qrkey_list = {'KEY_0': '0', + 'KEY_1': '1', + 'KEY_2': '2', + 'KEY_3': '3', + 'KEY_4': '4', + 'KEY_5': '5', + 'KEY_6': '6', + 'KEY_7': '7', + 'KEY_8': '8', + 'KEY_9': '9' + } + + +class QRReader: + __qrstr = "" + __myReader = {} + __numdigits = 10 + __dev = None + + def __init__(self): + self.getQRlist() + + def getQRlist(self): + devices = [evdev.InputDevice(path) for path in evdev.list_devices()] + logObj.getlogger().debug("{}".format(devices)) + for device in devices: + logObj.getlogger().debug("{}".format(device.name)) + if device.name in qrdevice_list: + self.__myReader['NAME'] = device.name + # print(self.__myReader['NAME']) + self.__myReader['PATH'] = device.path + # print(self.__myReader['PATH']) + self.__myReader['PHYS'] = device.phys + # print(self.__myReader['PHYS']) + + def IsQR(self): + return 'NAME' in self.__myReader + + def getQRNumber(self): + return self.__qrstr + + def openQR(self): + if self.IsQR(): + self.closeQR() + self.__dev = InputDevice(self.__myReader['PATH']) + return True + return False + + def closeQR(self): + if self.__dev: + del self.__dev + self.__dev = None + + def readQR(self): + """" Sync Read up to numdigits """ + count = 0 + self.__qrstr = "" + if self.__dev: + self.__dev.grab() + for event in self.__dev.read_loop(): + if event.type == evdev.ecodes.EV_KEY: + c_event = categorize(event) + if c_event.keycode in qrkey_list: + if c_event.keystate == 0: + self.__qrstr += qrkey_list[c_event.keycode] + count += 1 + if count == self.__numdigits: + break + self.__dev.ungrab() + return True + return False + + def wait_event(self, timeout): + selector.register(self.__dev, selectors.EVENT_READ) + while True: + events = selector.select(timeout); + if not events: + return False + else: + return True + + def readQRasync(self, timeout): + count = 0 + r = False + self.__qrstr = "" + if self.__dev: + self.__dev.grab() + if self.wait_event(timeout): + for event in self.__dev.read_loop(): + if event.type == evdev.ecodes.EV_KEY: + c_event = categorize(event) + if c_event.keycode in qrkey_list: + if c_event.keystate == 0: + self.__qrstr += qrkey_list[c_event.keycode] + count += 1 + if count == self.__numdigits: + r = True + break + self.__dev.ungrab() + return r + return False + +# qr = QRReader() +# if qr.openQR(): +# print(qr.readQRasync(2)) +# qr.closeQR() diff --git a/test-cli/test/helpers/sdl.py b/test-cli/test/helpers/sdl.py new file mode 100644 index 0000000..8131a53 --- /dev/null +++ b/test-cli/test/helpers/sdl.py @@ -0,0 +1,126 @@ +import sdl2.ext +from sdl2 import * +import os +import time + +Colors = { + 'red': sdl2.ext.Color(255, 0, 0, 255), + 'green': sdl2.ext.Color(0, 255, 0, 255), + 'blue': sdl2.ext.Color(0, 0, 255, 255), + 'white': sdl2.ext.Color(255, 255, 255, 255), + 'black': sdl2.ext.Color(0, 0, 0, 255), +} + + +class SDL2(object): + __resources = None + __w = 1280 + __h = 720 + __winName = "noname" + __window = None + __factory = None + __renderer = None + __srender = None + + def __init__(self, driver, display, w, h): + os.environ['SDL_VIDEODRIVER'] = "x11" + os.environ['DISPLAY'] = ":0" + # self.__resources = sdl2.ext.Resources(parent.getAppPath(), 'files') + sdl2.ext.init() + self.__createWindow() + + def __createWindow(self): + self.__window = sdl2.ext.Window(title='{}'.format(self.__winName), size=(self.__w, self.__h), position=(0, 0), + flags=( + SDL_WINDOW_HIDDEN | SDL_WINDOW_FULLSCREEN | SDL_WINDOW_BORDERLESS | SDL_WINDOW_MAXIMIZED)) + self.__renderer = sdl2.ext.Renderer(self.__window) + self.__factory = sdl2.ext.SpriteFactory(sdl2.ext.TEXTURE, renderer=self.__renderer) + self.__srender = self.__factory.create_sprite_render_system(self.__window) + self.__window.show() + SDL_ShowCursor(SDL_DISABLE) + + # def createFont(self, fontName, fontSize, fontColor): + # return sdl2.ext.FontManager(font_path=self.__resources.get_path(fontName), size=fontSize, color=fontColor) + + def WriteText(self, text, x, y, fontManager): + imText = self.__factory.from_text(text, fontmanager=fontManager) + self.__renderer.copy(imText, dstrect=(x, y, imText.size[0], imText.size[1])) + + def show(self): + self.__window.show() + + def hide(self): + self.__window.hide() + + def setColor(self, red, green, blue, alpha): + self.__renderer.color = sdl2.ext.Color(red, green, blue, alpha) + + def fillColor(self, red, green, blue, alpha): + self.setColor(red, green, blue, alpha) + self.__renderer.clear() + + def fillbgColor(self, color, update=False): + if color in Colors: + self.__renderer.color = Colors[color] + self.__renderer.clear() + if update: + self.update() + + def update(self): + self.__renderer.present() + + # def showImage(self, filename): + # im = self.__factory.from_image(self.__resources.get_path(filename)) + # self.__srender.render(im) + + +class SDL2_Test(object): + __sdl2 = None + + def __init__(self, driver, display, w, h): + self.__sdl2 = SDL2(driver, display, w, h) + + def Clear(self): + self.__sdl2.fillbgColor('black', True) + + def Paint(self, dcolor): + self.Clear() + self.__sdl2.fillbgColor(dcolor.lower(), True) + + +class SDL2_Message(object): + __sdl2 = None + __fontManager_w = None + __fontManager_b = None + + def __init__(self): + self.__sdl2 = SDL2() + self.__fontManager_w = self.__sdl2.createFont('OpenSans-Regular.ttf', 24, Colors['white']) + self.__fontManager_b = self.__sdl2.createFont('OpenSans-Regular.ttf', 24, Colors['black']) + + def Clear(self): + self.__sdl2.fillbgColor('black', True) + + def Paint(self, dcolor): + self.Clear() + self.__sdl2.fillbgColor(dcolor.lower(), True) + + def __write_w(self, x, y, text): + self.__sdl2.WriteText(text, x, y, self.__fontManager_w) + + def __write_b(self, x, y, text): + self.__sdl2.WriteText(text, x, y, self.__fontManager_b) + + def setTestOK(self, board_uuid, qrdata): + self.Paint('GREEN') + self.__write_b(100, 50, 'TEST OK') + self.__write_b(100, 100, 'uuid={}'.format(board_uuid)) + self.__write_b(100, 150, 'qr={}'.format(qrdata)) + self.__sdl2.update() + + def setTestERROR(self, error): + self.Paint('RED') + self.__write_w(100, 50, 'TEST FAILED') + self.__write_w(100, 100, '{}'.format(error)) + self.__sdl2.update() + diff --git a/test-cli/test/helpers/setup_xml.py b/test-cli/test/helpers/setup_xml.py index 3fd9fd5..d61d1fb 100644 --- a/test-cli/test/helpers/setup_xml.py +++ b/test-cli/test/helpers/setup_xml.py @@ -1,22 +1,22 @@ import xml.etree.ElementTree as XMLParser class XMLSetup (object): - """aaaaa""" + """XML Setup Parser""" __tree = None # Parser __dbType = None # database connection required: PgSQLConnection __dbConnectionRaw = None # Connection string in raw __dbConnectionStr = None # Connection string to use in sql object connection def __init__(self, filename): - """aaaaa""" + """Parse the file in the constructor""" self.__tree = XMLParser.parse(filename) def __del__(self): - """aaaaa""" + """Destructor do nothing""" pass - def getdbConnectionStr (self): - """aaaaa""" + def getdbConnectionStr(self): + """XML to database connection string""" if self.__dbConnectionRaw is not None: return self.__dbConnectionRaw @@ -29,12 +29,44 @@ class XMLSetup (object): return None - def getPostgresConnectionStr (self): - """aaaaa""" + def getPostgresConnectionStr(self): + """Get Connection string """ str = self.__dbConnectionRaw del str['type'] return str - def getMysqlConnectionStr (self): + def getBoard(self, key, default): + for element in self.__tree.iter('board'): + if key in element.attrib: + return element.attrib[key] + return default + + def getTempPath(self, key, default): + for element in self.__tree.iter('tmpPath'): + if key in element.attrib: + return element.attrib[key] + return default + + def getKeyVal(self, tag, key, default): + for element in self.__tree.iter(tag): + if key in element.attrib: + return element.attrib[key] + return default + + def gettagKey (self, xmltag, xmlkey): """aaaaa""" - pass
\ No newline at end of file + for element in self.__tree.iter(xmltag): + return element.attrib[xmlkey] + + return None + + def getUSBlist(self): + list = {} + count = 0 + for elements in self.__tree.iter("usbdev"): + sublist = {} + for key,val in elements.items(): + sublist[key] = val + list[count] = sublist + count += 1 + return list
\ No newline at end of file diff --git a/test-cli/test/helpers/syscmd.py b/test-cli/test/helpers/syscmd.py index b579e39..7223774 100644 --- a/test-cli/test/helpers/syscmd.py +++ b/test-cli/test/helpers/syscmd.py @@ -1,6 +1,6 @@ import unittest import subprocess -from test.helpers.globalVariables import globalVar +import threading class TestSysCommand(unittest.TestCase): @@ -10,7 +10,7 @@ class TestSysCommand(unittest.TestCase): __outdata = None __outtofile = False - def __init__(self, testname, testfunc, str_cmd, outtofile = False): + def __init__(self, testname, testfunc, str_cmd, outtofile=False): """ init """ super(TestSysCommand, self).__init__(testfunc) self.__str_cmd = str_cmd @@ -42,7 +42,7 @@ class TestSysCommand(unittest.TestCase): else: res = -3 outdata = completed.stdout - self.longMessage=str(outdata).replace("'","") + self.longMessage = str(outdata).replace("'", "") self.assertTrue(True) except subprocess.CalledProcessError as err: self.assertTrue(False) @@ -54,11 +54,14 @@ class TestSysCommand(unittest.TestCase): def remove_file(self): pass + class SysCommand(object): __str_cmd = None __cmdname = None __outdata = None __errdata = None + __returnCode = None + __exception = None def __init__(self, cmdname, str_cmd): """ init """ @@ -66,31 +69,37 @@ class SysCommand(object): self.__cmdname = cmdname def getName(self): - return self.__testname + return self.__cmdname + + def setName(self, cmdName): + self.__cmdname = cmdName + + def getParams(self): + return self.__str_cmd + + def setParam(self, params): + self.__str_cmd = params def execute(self): - res = -1 + # try: + self.__outdata = None + self.__errdata = None + self.__exception = None try: - self.__outdata = None - self.__errdata = None completed = subprocess.run( self.__str_cmd, - check=True, + check=False, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE ) + self.__returnCode = completed.returncode self.__outdata = completed.stdout - if completed.returncode is 0: - res = 0 - if completed.stderr.decode('ascii') != "": - res = -1 - self.__errdata = completed.stderr + self.__errdata = completed.stderr.decode('ascii') except subprocess.CalledProcessError as err: - res = -2 - except Exception as t: - res = -3 - return res + self.__exception = err + self.__returnCode = -1 + return self.__returnCode def getOutput(self): return self.__outdata @@ -98,6 +107,12 @@ class SysCommand(object): def getOutErr(self): return self.__errdata + def getException(self): + return self.__exception + + def IsException(self): + return self.__exception is not None + def getOutputlines(self): return self.__outdata.splitlines() @@ -106,3 +121,26 @@ class SysCommand(object): f.write(self.__outdata) f.close() + +class AsyncSys(threading.Thread): + sysObject = None + sysres = None + + def __init__(self, threadID, name, counter): + threading.Thread.__init__(self) + self.threadID = threadID + self.name = name + self.counter = counter + self.sysObject = SysCommand(name, None) + + def setParams(self, Params): + self.sysObject.setParam(Params) + + def getRes(self): + return self.sysres + + def getData(self): + return self.sysObject.getOutput() + + def run(self): + self.sysres = self.sysObject.execute() diff --git a/test-cli/test/helpers/testsrv_db.py b/test-cli/test/helpers/testsrv_db.py index 94181f9..8e96e8c 100644 --- a/test-cli/test/helpers/testsrv_db.py +++ b/test-cli/test/helpers/testsrv_db.py @@ -1,10 +1,10 @@ from test.helpers.psqldb import PgSQLConnection -from test.helpers.setup_xml import XMLSetup -def find_between( s, first, last ): + +def find_between(s, first, last): try: - start = s.index( first ) + len( first ) - end = s.index( last, start ) + start = s.index(first) + len(first) + end = s.index(last, start) return s[start:end] except ValueError: return "" @@ -19,158 +19,242 @@ class TestSrv_Database(object): def __init__(self): pass - def open (self, filename): - '''Open database connection''' - self.__xml_setup = XMLSetup(filename) + def open(self, xmlObj): + self.__xml_setup = xmlObj self.__sqlObject = PgSQLConnection() return self.__sqlObject.db_connect(self.__xml_setup.getdbConnectionStr()) - def create_board(self, processor_id, model_id, variant, bmac = None): + def getConfig(self): + return self.__sqlObject.getConfig() + + def create_board(self, processor_id, model_id, variant, station, bmac=None): '''create a new board''' if bmac is None: - sql = "SELECT isee.create_board('{}', '{}', '{}', NULL);".format(processor_id, model_id, variant) + sql = "SELECT * FROM isee.f_create_board('{}', '{}', '{}', NULL, '{}');".format(processor_id, model_id, + variant, + station) else: - sql = "SELECT isee.create_board('{}', '{}', '{}', '{}');".format(processor_id, model_id, variant, bmac) - #print('>>>' + sql) + sql = "SELECT * FROM isee.f_create_board('{}', '{}', '{}', '{}', '{}');".format(processor_id, model_id, + variant, + bmac, station) + # print('>>>' + sql) try: res = self.__sqlObject.db_execute_query(sql) - #print(res) - return res[0][0]; + # print(res) + return res[0][0] except Exception as err: r = find_between(str(err), '#', '#') - #print(r) + print(r) + print(str(err)) return None - def create_model(self, modid, variant, descr, tgid): - '''create new model''' - sql = "SELECT isee.create_model('{}', '{}', '{}', '{}')".format(modid, variant, descr, tgid) - #print('>>>' + sql) + def get_tests_list(self, board_uuid): + '''get the board test list''' + sql = "SELECT * FROM isee.f_get_tests_list('{}')".format(board_uuid) + # print('>>>' + sql) try: res = self.__sqlObject.db_execute_query(sql) - #print(res) - return res[0][0]; + # print(res) + return res except Exception as err: r = find_between(str(err), '#', '#') - #print(r) + # print(r) return None - def create_test_definition(self, testname, testdesc, testfunc): - '''Create a new definition and return definition id on fail (testname already exist) return -1''' - sql = "SELECT isee.define_test('{}', '{}', '{}')".format(testname, testdesc, testfunc) - #print('>>>' + sql) + def get_test_params_list(self, testid): + sql = "SELECT * FROM isee.f_get_test_params_list({})".format(testid) + # print('>>>' + sql) try: res = self.__sqlObject.db_execute_query(sql) - #print(res) - return res[0][0]; + # print(res) + return res except Exception as err: r = find_between(str(err), '#', '#') - #print(r) + # print(r) return None - def add_testdef_to_group(self, testgroupid, testname, testparam): - '''Assign definition to group test return true on success or false if it fails''' - sql = "SELECT isee.add_test_to_group('{}', '{}', '{}')".format(testgroupid, testname, testparam) - #print('>>>' + sql) + def open_test(self, board_uuid): + sql = "SELECT * FROM isee.f_open_test('{}')".format(board_uuid) + # print('>>>' + sql) try: res = self.__sqlObject.db_execute_query(sql) - #print(res) - return res[0][0]; + # print(res) + return res[0][0] except Exception as err: r = find_between(str(err), '#', '#') - #print(r) + # print(r) return None - def getboard_test_list(self, board_uuid): - '''get the board test list''' - sql = "SELECT isee.gettestlist('{}')".format(board_uuid) - #print('>>>' + sql) + def run_test(self, testid_ctl, testid): + sql = "SELECT isee.f_run_test({},{})".format(testid_ctl, testid) + # print('>>>' + sql) + try: + self.__sqlObject.db_execute_query(sql) + except Exception as err: + r = find_between(str(err), '#', '#') + # print(r) + return None + + def finish_test(self, testid_ctl, testid, newstatus, textresult): + sql = "SELECT isee.f_finish_test({},{},'{}','{}')".format(testid_ctl, testid, newstatus, textresult) + # print('>>>' + sql) + try: + print('finish_test => SQL {}'.format(sql)) + self.__sqlObject.db_execute_query(sql) + return True + except Exception as err: + r = find_between(str(err), '#', '#') + print('finish_test => {}'.format(err)) + return False + + def upload_result_file(self, testid_ctl, testid, desc, filepath, mimetype): + try: + # generate a new oid + fileoid = self.__sqlObject.db_upload_large_file(filepath) + # insert into a table + sql = "SELECT isee.f_upload_result_file({},{},'{}','{}','{}')".format(testid_ctl, testid, desc, fileoid, + mimetype) + # print('>>>' + sql) + self.__sqlObject.db_execute_query(sql) + except Exception as err: + r = find_between(str(err), '#', '#') + # print(r) + return None + + def get_task_variables_list(self, board_uuid): + sql = "SELECT * FROM isee.f_get_task_variables_list('{}')".format(board_uuid) + # print('>>>' + sql) try: res = self.__sqlObject.db_execute_query(sql) - #print(res) - return res; + # print(res) + return res except Exception as err: r = find_between(str(err), '#', '#') - #print(r) + # print(r) return None - def getboard_comp_test_list(self, board_uuid): - '''get the board test list''' - sql = "SELECT isee.gettestcompletelist('{}')".format(board_uuid) - #print('>>>' + sql) + + def get_plc_macaddr(self, board_uuid): + sql = "SELECT * FROM isee.f_get_plcmac(\'{}\')".format(board_uuid) + # print('>>>' + sql) + try: + res = self.__sqlObject.db_execute_query(sql) + return res, '' + except Exception as err: + r = find_between(str(err), '#', '#') + print(r) + return None, r + + + def get_board_macaddr(self, board_uuid): + sql = "SELECT * FROM isee.f_get_board_macaddr('{}')".format(board_uuid) + # print('>>>' + sql) try: res = self.__sqlObject.db_execute_query(sql) - #print(res) - return res; + # print(res) + return res[0][0] except Exception as err: r = find_between(str(err), '#', '#') - #print(r) + # print(r) return None - def open_testbatch(self, board_uuid): - '''get the board test list''' - sql = "SELECT isee.open_testbatch('{}')".format(board_uuid) - #print('>>>' + sql) + def open_task(self, uuid): + sql = "SELECT * FROM isee.f_open_task('{}')".format(uuid) + # print('>>>' + sql) try: - res = str(self.__sqlObject.db_execute_query(sql)[0]) - res = res.replace('(', '') - res = res.replace(')', '') - res = res.replace(',', '') - #print(res) - return res; + res = self.__sqlObject.db_execute_query(sql) + # print(res) + return res[0][0] except Exception as err: r = find_between(str(err), '#', '#') - #print(r) + # print(r) return None - def add_test_to_batch(self, board_uuid, testid, testid_ctl, result, groupid, data): - '''get the board test list''' - sql = "SELECT isee.add_test_to_batch_c('{}','{}','{}','{}','{}','{}')".format(board_uuid, testid, testid_ctl, result, groupid, data) - #print('>>>' + sql) + def create_task_result(self, taskid_ctl, name, newstatus, newinfo=None): + sql = "SELECT isee.f_create_task_result({},'{}','{}','{}')".format(taskid_ctl, name, newstatus, newinfo) + # print('>>>' + sql) + try: + self.__sqlObject.db_execute_query(sql) + except Exception as err: + r = find_between(str(err), '#', '#') + # print(r) + return None + + def update_taskctl_status(self, taskid_ctl, newstatus): + sql = "SELECT isee.f_update_taskctl_status({},'{}')".format(taskid_ctl, newstatus) + # print('>>>' + sql) + try: + self.__sqlObject.db_execute_query(sql) + except Exception as err: + r = find_between(str(err), '#', '#') + # print(r) + return None + + def set_factorycode(self, uuid, factorycode): + sql = "SELECT isee.f_set_factorycode('{}','{}')".format(uuid, factorycode) + # print('>>>' + sql) + try: + self.__sqlObject.db_execute_query(sql) + except Exception as err: + r = find_between(str(err), '#', '#') + # print(r) + return None + + def read_station_state(self, station): + sql = "SELECT * FROM station.getmystate('{}');".format(station) + # print('>>>' + sql) try: res = self.__sqlObject.db_execute_query(sql) - #print(res) - return res; + # print(res) + return res[0][0] except Exception as err: r = find_between(str(err), '#', '#') - #print(r) + # print(r) return None - def close_testbatch(self, board_uuid, testid_ctl): - '''get the board test list''' - sql = "SELECT isee.close_testbatch('{}','{}')".format(board_uuid, testid_ctl) - #print('>>>' + sql) + def change_station_state(self, station, newstate): + sql = "SELECT station.setmystate('{}', '{}', NULL)".format(newstate, station) + print('>>>' + sql) try: res = self.__sqlObject.db_execute_query(sql) - #print(res) - return res; + print(res) + return res[0][0] except Exception as err: r = find_between(str(err), '#', '#') - #print(r) + print(r) return None - def update_set_test_row(self, nstation, testid_ctl, board_uuid, ctest, cstatus): - '''get the board test list''' - sql = "SELECT isee.update_set_test_row('{}','{}','{}','{}','{}')".format(nstation, testid_ctl ,board_uuid, ctest, cstatus) - #print('>>>' + sql) + def get_setup_variable(self, skey): + sql = "SELECT * FROM admin.get_setupvar('{}')".format(skey) + # print('>>>' + sql) try: res = self.__sqlObject.db_execute_query(sql) - #print(res) - return res; + # print(res) + return res[0][0] except Exception as err: r = find_between(str(err), '#', '#') - #print(r) + # print(r) return None - def getboard_eeprom(self, board_uuid): - '''get the board eeprom struct ''' - sql = "SELECT isee.getboard_eeprom('{}')".format(board_uuid) - #print('>>>' + sql) + def get_setup_variable(self, skey , default): + sql = "SELECT * FROM admin.get_setupvar('{}')".format(skey) + # print('>>>' + sql) + try: + res = self.__sqlObject.db_execute_query(sql) + # print(res) + return res[0][0] + except Exception as err: + r = find_between(str(err), '#', '#') + # print(r) + return default + + def setDevelStationState(self, station, newState): + sql = "UPDATE station.station_state SET state = {} WHERE hostname={}".format(newState, station) try: res = self.__sqlObject.db_execute_query(sql) - #print(res) - return res; + # print(res) + return res[0][0] except Exception as err: r = find_between(str(err), '#', '#') - #print(r) + # print(r) return None -
\ No newline at end of file diff --git a/test-cli/test/helpers/uboot_flasher.py b/test-cli/test/helpers/uboot_flasher.py deleted file mode 100644 index e69de29..0000000 --- a/test-cli/test/helpers/uboot_flasher.py +++ /dev/null diff --git a/test-cli/test/helpers/usb.py b/test-cli/test/helpers/usb.py new file mode 100644 index 0000000..8f8848d --- /dev/null +++ b/test-cli/test/helpers/usb.py @@ -0,0 +1,270 @@ +from test.helpers.syscmd import SysCommand +from test.helpers.amper import Amper +import scanf +import os +import json + + +class USBDevices(object): + __dev_list = {} + __serial_converter = {} + __usb_disk = {} + __usb_amper = {} + __dev_list_filtered = {} + __usb_mstorage = {} + __xml_config = None + + def __init__(self, xml): + self.__xml_config = xml + self.__getusblist() + self.__getSerialDevices() + self.__getAmper() + self.__getMassStorage() + + def print_usb(self): + print("------ usb list -------") + print(json.dumps(self.__dev_list)) + print("------ serial converter list -------") + print(self.__serial_converter) + print("------ usb disk list -------") + print(self.__usb_disk) + print("------ usb amper list -------") + print(self.__usb_amper) + print("------ usb mass storage list -------") + print(self.__usb_mstorage) + print("------ filtered list -------") + print(json.dumps(self.__dev_list_filtered)) + + def refresh(self): + self.__dev_list = {} + self.__serial_converter = {} + self.__usb_disk = {} + self.__usb_amper = {} + self.__usb_mstorage = {} + self.__dev_list_filtered = {} + self.__getusblist() + self.__getSerialDevices() + self.__getAmper() + self.__getMassStorage() + + def getMassStorage(self): + return self.__usb_mstorage + + def getAmper(self): + return self.__usb_amper + + def __getAmper(self): + for c, s_items in self.__serial_converter.items(): + for k,v in s_items.items(): + if k.find("USB") != -1 and v == "Prolific_Technology_Inc._USB-Serial_Controller": + testAmper = Amper(port=k) + testAmper.open() + res = testAmper.hello() + if res: + self.__usb_amper = {'port': k, 'ver': testAmper.getVersion()} + testAmper.close() + if res: + return + + def __getMassStorage(self): + for c, s_items in self.__usb_disk.items(): + for k,v in s_items.items(): + if k == "udev": + if v.get('ID_MODEL', None) == "File-Stor_Gadget" or v.get('ID_FS_LABEL', None) != "NODE_L031K6": + self.__usb_mstorage = {'disk': v.get('DEVNAME', ''), 'id': v.get('ID_FS_LABEL_ENC', '')} + + + def __check_ignore_usb_list(self, usb_list, dev_list): + count = 0 + ignore = "0" + for _, l in usb_list.items(): + for k, v in l.items(): + if k == "ignore": + ignore = v + else: + value = dev_list.get(k, None) + if v == value: + count += 1 + else: + count = 0 + break + if count >= len(l) -1: + break + + if count > 0: + if ignore == "1": + return True + return False + + + def __create_ignore_list(self, usb_list): + count = 1 + for _, dev in self.__dev_list.items(): + if not self.__check_ignore_usb_list(usb_list, dev): + self.__dev_list_filtered[count] = dev + count += 1 + + def __getusblist(self): + count = 0 + res = open('/sys/kernel/debug/usb/devices', 'rb').read().decode() + list = {} + for i in res.split('\n'): + if i.find('T:') != -1: + if len(list) > 0: + count += 1 + self.__dev_list[count] = list + list = self.__parse_T(i) + elif i.find('D:') != -1: + list = {**list, **self.__parse_D(i)} + elif i.find('P:') != -1: + list = {**list, **self.__parse_P(i)} + elif i.find('S:') != -1: + list = {**list, **self.__parse_S(i)} + else: + """ Ignore all rest""" + pass + + if count > 0: + count += 1 + self.__dev_list[count] = list + + self.__create_ignore_list(self.__xml_config.getUSBlist()) + + + def __parse_T(self, str): + data = {} + res = scanf.scanf("T: Bus=%2c Lev=%2c Prnt=%2c Port=%2c Cnt=%2c Dev#=%3c Spd=%s MxCh=%2c", str) + data["BUS"] = res[0] + data["LEV"] = res[1] + data["PRNT"] = res[2] + data["PORT"] = res[3] + data["CNT"] = res[4] + data["DEV"] = res[5].lstrip() + data["SPEED"] = res[6].rstrip() + data["MXCH"] = res[7].lstrip() + return data + + def __parse_D(self, str): + data = {} + str += ' ' + res = scanf.scanf("D: Ver=%5c Cls=%2c(%s ) %s ", str) + data["VER"] = res[0].lstrip() + data["CLASS"] = res[1] + return data + + def __parse_P(self, str): + data = {} + str += ' ' + res = scanf.scanf("P: Vendor=%4c ProdID=%4c %s", str) + data["VENDOR"] = res[0].lstrip() + data["PRODID"] = res[1] + return data + + def __parse_S(self, str): + data = {} + if str.find('Manufacturer') != -1: + pos = str.find('Manufacturer=') + data["MANUFACTURER"] = str[pos + len('Manufacturer='):].rstrip() + elif str.find('Product') != -1: + pos = str.find('Product=') + data["PRODUCT"] = str[pos + len('Product='):].rstrip() + elif str.find('SerialNumber') != -1: + pos = str.find('SerialNumber=') + data["SERIAL"] = str[pos + len('SerialNumber='):].rstrip() + else: + pass + return data + + def finddevicebytag(self, tag, str): + t = {} + count = 0 + for i, k in self.__dev_list.items(): + if tag in k: + if k[tag].find(str) != -1: + t[count] = k + count += 1 + return t + + def __getSerialDevices(self): + self.__serial_converter = {} + self.__usb_disk = {} + self.__usb_amper = {} + test_myPath = os.path.dirname(os.path.abspath(__file__)) + test_myPath = test_myPath + "/../" + s = SysCommand('usb', '{}/scripts/usb.sh'.format(test_myPath)) + s.execute() + data_list = s.getOutput().decode() + for i in data_list.split('\n'): + if len(i) > 0: + pos_div = i.find('-') + dev = i[0:pos_div - 1] + devName = i[pos_div + 2:] + # print("dev: {}".format(dev)) + # print("devName: {}".format(devName)) + res = scanf.scanf("/dev/sd%c", dev) + if res is not None: + dev_id = res[0][0] + # Now verify if is Disk or Partition + res = scanf.scanf("/dev/sd%c%c", dev) + if res is not None: + # this is a partition and we can ignore it + pass + else: + self.__usb_disk[dev_id] = {dev: devName, 'udev': self.__getudevquery(dev)} + else: + res = scanf.scanf("/dev/ttyUSB%c", dev) + if res is not None: + serial_id = res[0][0] + self.__serial_converter[serial_id] = {dev: devName, 'udev': self.__getudevquery(dev)} + + def __getudevquery(self, device): + list = {} + s = SysCommand('query-udev', 'udevadm info --query=all -n {}'.format(device)) + res = s.execute() + if res != 0: + return None + for i in s.getOutput().decode().split('\n'): + if i.find('P:') != -1: + list = self.__parse_udev_P(i) + elif i.find('S:') != -1: + list = {**list, **self.__parse_udev_S(i)} + elif i.find('N:') != -1: + list = {**list, **self.__parse_udev_N(i)} + elif i.find('S:') != -1: + list = {**list, **self.__parse_udev_S(i)} + elif i.find('E:') != -1: + list = {**list, **self.__parse_udev_E(i)} + return list + + def __parse_udev_P(self, str): + pos = str.find('P: ') + str = str[len('P: ') + pos:] + return {'PATH': str} + + def __parse_udev_S(self, str): + pos = str.find('P: ') + str = str[len('P: ') + pos:] + pos = str.find('disk/by-id/') + if pos != -1: + return {'DISK_BY_ID': str[len('disk/by-id/') + pos:]} + pos = str.find('disk/by-path/') + if pos != -1: + return {'DISK_BY_PATH': str[len('disk/by-path/') + pos:]} + pos = str.find('disk/by-uuid/') + if pos != -1: + return {'DISK_BY_UUID': str[len('disk/by-uuid/') + pos:]} + else: + return {} + + def __parse_udev_N(self, str): + pos = str.find('N: ') + str = str[len('N: ') + pos:] + return {'DEV_NAME': str} + + def __parse_udev_E(self, str): + pos = str.find('E: ') + str = str[len('E: ') + pos:] + pos = str.find('=') + str_key = str[:pos] + str_value = str[pos + 1:] + return {str_key: str_value} diff --git a/test-cli/test/helpers/utils.py b/test-cli/test/helpers/utils.py new file mode 100644 index 0000000..8d2c27b --- /dev/null +++ b/test-cli/test/helpers/utils.py @@ -0,0 +1,68 @@ +import json +import socket +import os +import scanf + +def save_json_to_disk(filePath, description, mime, json_data, result): + with open(filePath, 'w') as outfile: + json.dump(json_data, outfile, indent=4) + outfile.close() + result.append( + { + "description": description, + "filepath": filePath, + "mimetype": mime + } + ) + +def save_file_to_disk(filePath, description, mime, data, result): + with open(filePath, 'w') as outfile: + outfile.write(data) + outfile.close() + result.append( + { + "description": description, + "filepath": filePath, + "mimetype": mime + } + ) + +def save_result (filePath, description, mime, result): + result.append( + { + "description": description, + "filepath": filePath, + "mimetype": mime + } + ) + + +def str2bool(v): + return v.lower() in ("yes", "true", "t", "1") + +def station2Port(base): + Name = socket.gethostname() + res = scanf.scanf("Station%d", Name) + if res[0]: + NmBase = int(base) + res = int(res[0]) + return str(NmBase + res) + return base + + +def sys_read(sysnode): + try: + f = open(sysnode, "r") + data = f.readline() + f.close() + except IOError as Error: + return False, '{}'.format(Error.errno) + return True, data + +def removefileIfExist(fname): + if os.path.exists(fname): + try: + os.remove(fname) + except OSError as error: + return False, str(error) + return True, '' |