Coverage for plugins/datasources/oracle/oracle.py: 38%
42 statements
« prev ^ index » next coverage.py v7.10.1, created at 2025-07-28 07:25 +0000
« prev ^ index » next coverage.py v7.10.1, created at 2025-07-28 07:25 +0000
1#!/usr/bin/env python3
2# -*- coding: utf-8 -*-
4# Hermes : Change Data Capture (CDC) tool from any source(s) to any target
5# Copyright (C) 2023, 2024 INSA Strasbourg
6#
7# This file is part of Hermes.
8#
9# Hermes is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# Hermes is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with Hermes. If not, see <https://www.gnu.org/licenses/>.
23from typing import Any
25from lib.plugins import AbstractDataSourcePlugin
26import oracledb
28HERMES_PLUGIN_CLASSNAME: str | None = "DatasourceOracle"
29"""The plugin class name defined in this module file"""
32class DatasourceOracle(AbstractDataSourcePlugin):
33 """Remote Data Source for Oracle database"""
35 def __init__(self, settings: dict[str, Any]):
36 """Instantiate new plugin and store a copy of its settings dict in
37 self._settings"""
38 super().__init__(settings)
39 self._dbcon: oracledb.connection.Connection | None = None
41 # returns strings or bytes instead of a LOB locator
42 oracledb.defaults.fetch_lobs = False
44 def open(self):
45 """Establish connection with Oracle Database"""
46 params = {
47 "user": self._settings["login"],
48 "password": self._settings["password"],
49 "host": self._settings["server"],
50 "port": self._settings["port"],
51 }
52 if "service_name" in self._settings:
53 params["service_name"] = self._settings["service_name"]
54 if "sid" in self._settings:
55 params["sid"] = self._settings["sid"]
57 cp = oracledb.ConnectParams(**params)
58 self._dbcon = oracledb.connect(params=cp)
60 def close(self):
61 """Close connection with Oracle Database"""
62 self._dbcon.close()
63 self._dbcon = None
65 def fetch(
66 self,
67 query: str | None,
68 vars: dict[str, Any],
69 ) -> list[dict[str, Any]]:
70 """Fetch data from datasource with specified query and optional queryvars.
71 Returns a list of dict containing each entry fetched, with REMOTE_ATTRIBUTES
72 as keys, and corresponding fetched values as values"""
73 fetcheddata = []
74 with self._dbcon.cursor() as cur:
75 cur.execute(query, vars)
76 columns = [col[0] for col in cur.description]
77 cur.rowfactory = lambda *args: dict(zip(columns, args))
78 for row in cur:
79 fetcheddata.append(row)
80 return fetcheddata
82 def add(self, query: str | None, vars: dict[str, Any]):
83 """Add data to datasource with specified query and optional queryvars"""
84 with self._dbcon.cursor() as cur:
85 cur.execute(query, vars)
86 self._dbcon.commit()
88 def delete(self, query: str | None, vars: dict[str, Any]):
89 """Delete data from datasource with specified query and optional queryvars"""
90 with self._dbcon.cursor() as cur:
91 cur.execute(query, vars)
92 self._dbcon.commit()
94 def modify(self, query: str | None, vars: dict[str, Any]):
95 """Modify data on datasource with specified query and optional queryvars"""
96 with self._dbcon.cursor() as cur:
97 cur.execute(query, vars)
98 self._dbcon.commit()