Coverage for plugins/datasources/sqlite/sqlite.py: 77%

44 statements  

« prev     ^ index     » next       coverage.py v7.10.1, created at 2025-07-28 07:24 +0000

1#!/usr/bin/env python3 

2# -*- coding: utf-8 -*- 

3 

4# Hermes : Change Data Capture (CDC) tool from any source(s) to any target 

5# Copyright (C) 2023, 2024 INSA Strasbourg 

6# 

7# This file is part of Hermes. 

8# 

9# Hermes is free software: you can redistribute it and/or modify 

10# it under the terms of the GNU General Public License as published by 

11# the Free Software Foundation, either version 3 of the License, or 

12# (at your option) any later version. 

13# 

14# Hermes is distributed in the hope that it will be useful, 

15# but WITHOUT ANY WARRANTY; without even the implied warranty of 

16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

17# GNU General Public License for more details. 

18# 

19# You should have received a copy of the GNU General Public License 

20# along with Hermes. If not, see <https://www.gnu.org/licenses/>. 

21 

22 

23from typing import Any 

24 

25from lib.plugins import AbstractDataSourcePlugin 

26from datetime import datetime 

27import sqlite3 

28 

29HERMES_PLUGIN_CLASSNAME: str | None = "DatasourceSqlite" 

30"""The plugin class name defined in this module file""" 

31 

32 

33class DatasourceSqlite(AbstractDataSourcePlugin): 

34 """Remote Data Source for SQLite database""" 

35 

36 def __init__(self, settings: dict[str, Any]): 

37 """Instantiate new plugin and store a copy of its settings dict in 

38 self._settings""" 

39 super().__init__(settings) 

40 self._dbcon: sqlite3.Connection | None = None 

41 sqlite3.register_adapter(datetime, self.adapt_datetime_iso) 

42 sqlite3.register_converter("datetime", self.convert_datetime) 

43 

44 @staticmethod 

45 def adapt_datetime_iso(val: datetime) -> str: 

46 """Convert datetime to ISO 8601 datetime string without timezone""" 

47 return val.isoformat() 

48 

49 @staticmethod 

50 def convert_datetime(val: bytes) -> datetime: 

51 """Convert ISO 8601 datetime string to datetime object""" 

52 return datetime.fromisoformat(val.decode()) 

53 

54 def open(self): 

55 """Establish connection with SQLite Database""" 

56 self._dbcon = sqlite3.connect( 

57 database=self._settings["uri"], 

58 detect_types=sqlite3.PARSE_DECLTYPES, 

59 ) 

60 

61 def close(self): 

62 """Close connection with SQLite Database""" 

63 self._dbcon.close() 

64 self._dbcon = None 

65 

66 def fetch( 

67 self, 

68 query: str | None, 

69 vars: dict[str, Any], 

70 ) -> list[dict[str, Any]]: 

71 """Fetch data from datasource with specified query and optional queryvars. 

72 Returns a list of dict containing each entry fetched, with REMOTE_ATTRIBUTES 

73 as keys, and corresponding fetched values as values""" 

74 fetcheddata = [] 

75 cur = self._dbcon.cursor() 

76 cur.execute(query, vars) 

77 columns = [col[0] for col in cur.description] 

78 cur.row_factory = lambda _, args: dict(zip(columns, args)) 

79 for row in cur: 

80 fetcheddata.append(row) 

81 return fetcheddata 

82 

83 def add(self, query: str | None, vars: dict[str, Any]): 

84 """Add data to datasource with specified query and optional queryvars""" 

85 cur = self._dbcon.cursor() 

86 cur.execute(query, vars) 

87 self._dbcon.commit() 

88 

89 def delete(self, query: str | None, vars: dict[str, Any]): 

90 """Delete data from datasource with specified query and optional queryvars""" 

91 cur = self._dbcon.cursor() 

92 cur.execute(query, vars) 

93 self._dbcon.commit() 

94 

95 def modify(self, query: str | None, vars: dict[str, Any]): 

96 """Modify data on datasource with specified query and optional queryvars""" 

97 cur = self._dbcon.cursor() 

98 cur.execute(query, vars) 

99 self._dbcon.commit()