summaryrefslogtreecommitdiff
path: root/test-cli/test/helpers/psqldb.py
blob: 014141439c306d5f8a38c5ab390e2456e0878226 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
import psycopg2
import psycopg2.extras


class PgSQLConnection(object):
    """Postgres Connection Object"""

    __conection_object = None
    __db_config = {'dbname': 'testsrv', 'host': '192.168.2.171',
                   'password': 'Idkfa2009', 'port': 5432, 'user': 'admin'}

    def __init__(self, connect_str=None):
        self.__conection_object = None
        if connect_str is not None:
            self.__db_config = connect_str
        else:
            self.__db_config = {'dbname': 'testsrv', 'host': '192.168.2.171',
                                'password': 'Idkfa2009', 'port': 5432, 'user': 'admin'}

    def db_connect(self, connect_str=None):
        result = False
        try:
            if connect_str == None:
                self.__conection_object = psycopg2.connect(**self.__db_config)
            else:
                self.__db_config = connect_str
                self.__conection_object = psycopg2.connect(**self.__db_config)

            self.__conection_object.autocommit = True
            result = True
        except Exception as error:
            print(error)
        return result

    def getConfig(self):
        return self.__db_config

    def db_execute_query(self, query):
        cur = self.__conection_object.cursor()
        cur.execute(query)
        data = cur.fetchall()
        cur.close()
        return data

    def db_upload_large_file(self, filepath):
        # a new connection must be created to use large objects
        __conn = psycopg2.connect(**self.__db_config)
        lobj = __conn.lobject(oid=0, mode="rw", new_oid=0, new_file=filepath)
        fileoid = lobj.oid
        lobj.close()
        __conn.commit()
        __conn.close()
        return fileoid

    def commit(self):
        self.__conection_object.commit()

    def db_close(self):
        if self.__conection_object is not None:
            self.__conection_object.close()
            del self.__conection_object
            self.__conection_object = None

    def __del__(self):
        if self.__conection_object is not None:
            self.__conection_object.close()
            # print("disconnected")