Coverage for plugins/datasources/oracle/oracle.py: 38%

42 statements  

« prev     ^ index     » next       coverage.py v7.10.1, created at 2025-07-28 07:25 +0000

1#!/usr/bin/env python3 

2# -*- coding: utf-8 -*- 

3 

4# Hermes : Change Data Capture (CDC) tool from any source(s) to any target 

5# Copyright (C) 2023, 2024 INSA Strasbourg 

6# 

7# This file is part of Hermes. 

8# 

9# Hermes is free software: you can redistribute it and/or modify 

10# it under the terms of the GNU General Public License as published by 

11# the Free Software Foundation, either version 3 of the License, or 

12# (at your option) any later version. 

13# 

14# Hermes is distributed in the hope that it will be useful, 

15# but WITHOUT ANY WARRANTY; without even the implied warranty of 

16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

17# GNU General Public License for more details. 

18# 

19# You should have received a copy of the GNU General Public License 

20# along with Hermes. If not, see <https://www.gnu.org/licenses/>. 

21 

22 

23from typing import Any 

24 

25from lib.plugins import AbstractDataSourcePlugin 

26import oracledb 

27 

28HERMES_PLUGIN_CLASSNAME: str | None = "DatasourceOracle" 

29"""The plugin class name defined in this module file""" 

30 

31 

32class DatasourceOracle(AbstractDataSourcePlugin): 

33 """Remote Data Source for Oracle database""" 

34 

35 def __init__(self, settings: dict[str, Any]): 

36 """Instantiate new plugin and store a copy of its settings dict in 

37 self._settings""" 

38 super().__init__(settings) 

39 self._dbcon: oracledb.connection.Connection | None = None 

40 

41 # returns strings or bytes instead of a LOB locator 

42 oracledb.defaults.fetch_lobs = False 

43 

44 def open(self): 

45 """Establish connection with Oracle Database""" 

46 params = { 

47 "user": self._settings["login"], 

48 "password": self._settings["password"], 

49 "host": self._settings["server"], 

50 "port": self._settings["port"], 

51 } 

52 if "service_name" in self._settings: 

53 params["service_name"] = self._settings["service_name"] 

54 if "sid" in self._settings: 

55 params["sid"] = self._settings["sid"] 

56 

57 cp = oracledb.ConnectParams(**params) 

58 self._dbcon = oracledb.connect(params=cp) 

59 

60 def close(self): 

61 """Close connection with Oracle Database""" 

62 self._dbcon.close() 

63 self._dbcon = None 

64 

65 def fetch( 

66 self, 

67 query: str | None, 

68 vars: dict[str, Any], 

69 ) -> list[dict[str, Any]]: 

70 """Fetch data from datasource with specified query and optional queryvars. 

71 Returns a list of dict containing each entry fetched, with REMOTE_ATTRIBUTES 

72 as keys, and corresponding fetched values as values""" 

73 fetcheddata = [] 

74 with self._dbcon.cursor() as cur: 

75 cur.execute(query, vars) 

76 columns = [col[0] for col in cur.description] 

77 cur.rowfactory = lambda *args: dict(zip(columns, args)) 

78 for row in cur: 

79 fetcheddata.append(row) 

80 return fetcheddata 

81 

82 def add(self, query: str | None, vars: dict[str, Any]): 

83 """Add data to datasource with specified query and optional queryvars""" 

84 with self._dbcon.cursor() as cur: 

85 cur.execute(query, vars) 

86 self._dbcon.commit() 

87 

88 def delete(self, query: str | None, vars: dict[str, Any]): 

89 """Delete data from datasource with specified query and optional queryvars""" 

90 with self._dbcon.cursor() as cur: 

91 cur.execute(query, vars) 

92 self._dbcon.commit() 

93 

94 def modify(self, query: str | None, vars: dict[str, Any]): 

95 """Modify data on datasource with specified query and optional queryvars""" 

96 with self._dbcon.cursor() as cur: 

97 cur.execute(query, vars) 

98 self._dbcon.commit()