Coverage for plugins/datasources/sqlite/sqlite.py: 77%
44 statements
« prev ^ index » next coverage.py v7.10.1, created at 2025-07-28 07:24 +0000
« prev ^ index » next coverage.py v7.10.1, created at 2025-07-28 07:24 +0000
1#!/usr/bin/env python3
2# -*- coding: utf-8 -*-
4# Hermes : Change Data Capture (CDC) tool from any source(s) to any target
5# Copyright (C) 2023, 2024 INSA Strasbourg
6#
7# This file is part of Hermes.
8#
9# Hermes is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# Hermes is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with Hermes. If not, see <https://www.gnu.org/licenses/>.
23from typing import Any
25from lib.plugins import AbstractDataSourcePlugin
26from datetime import datetime
27import sqlite3
29HERMES_PLUGIN_CLASSNAME: str | None = "DatasourceSqlite"
30"""The plugin class name defined in this module file"""
33class DatasourceSqlite(AbstractDataSourcePlugin):
34 """Remote Data Source for SQLite database"""
36 def __init__(self, settings: dict[str, Any]):
37 """Instantiate new plugin and store a copy of its settings dict in
38 self._settings"""
39 super().__init__(settings)
40 self._dbcon: sqlite3.Connection | None = None
41 sqlite3.register_adapter(datetime, self.adapt_datetime_iso)
42 sqlite3.register_converter("datetime", self.convert_datetime)
44 @staticmethod
45 def adapt_datetime_iso(val: datetime) -> str:
46 """Convert datetime to ISO 8601 datetime string without timezone"""
47 return val.isoformat()
49 @staticmethod
50 def convert_datetime(val: bytes) -> datetime:
51 """Convert ISO 8601 datetime string to datetime object"""
52 return datetime.fromisoformat(val.decode())
54 def open(self):
55 """Establish connection with SQLite Database"""
56 self._dbcon = sqlite3.connect(
57 database=self._settings["uri"],
58 detect_types=sqlite3.PARSE_DECLTYPES,
59 )
61 def close(self):
62 """Close connection with SQLite Database"""
63 self._dbcon.close()
64 self._dbcon = None
66 def fetch(
67 self,
68 query: str | None,
69 vars: dict[str, Any],
70 ) -> list[dict[str, Any]]:
71 """Fetch data from datasource with specified query and optional queryvars.
72 Returns a list of dict containing each entry fetched, with REMOTE_ATTRIBUTES
73 as keys, and corresponding fetched values as values"""
74 fetcheddata = []
75 cur = self._dbcon.cursor()
76 cur.execute(query, vars)
77 columns = [col[0] for col in cur.description]
78 cur.row_factory = lambda _, args: dict(zip(columns, args))
79 for row in cur:
80 fetcheddata.append(row)
81 return fetcheddata
83 def add(self, query: str | None, vars: dict[str, Any]):
84 """Add data to datasource with specified query and optional queryvars"""
85 cur = self._dbcon.cursor()
86 cur.execute(query, vars)
87 self._dbcon.commit()
89 def delete(self, query: str | None, vars: dict[str, Any]):
90 """Delete data from datasource with specified query and optional queryvars"""
91 cur = self._dbcon.cursor()
92 cur.execute(query, vars)
93 self._dbcon.commit()
95 def modify(self, query: str | None, vars: dict[str, Any]):
96 """Modify data on datasource with specified query and optional queryvars"""
97 cur = self._dbcon.cursor()
98 cur.execute(query, vars)
99 self._dbcon.commit()