Coverage for plugins/messagebus_producers/sqlite/sqlite.py: 100%

39 statements  

« prev     ^ index     » next       coverage.py v7.10.1, created at 2025-07-28 07:24 +0000

1#!/usr/bin/env python3 

2# -*- coding: utf-8 -*- 

3 

4# Hermes : Change Data Capture (CDC) tool from any source(s) to any target 

5# Copyright (C) 2023, 2024 INSA Strasbourg 

6# 

7# This file is part of Hermes. 

8# 

9# Hermes is free software: you can redistribute it and/or modify 

10# it under the terms of the GNU General Public License as published by 

11# the Free Software Foundation, either version 3 of the License, or 

12# (at your option) any later version. 

13# 

14# Hermes is distributed in the hope that it will be useful, 

15# but WITHOUT ANY WARRANTY; without even the implied warranty of 

16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

17# GNU General Public License for more details. 

18# 

19# You should have received a copy of the GNU General Public License 

20# along with Hermes. If not, see <https://www.gnu.org/licenses/>. 

21 

22 

23from lib.plugins import AbstractMessageBusProducerPlugin 

24from lib.datamodel.event import Event 

25 

26from datetime import datetime, timedelta 

27from typing import Any 

28import sqlite3 

29 

30HERMES_PLUGIN_CLASSNAME: str | None = "SqliteProducerPlugin" 

31"""The plugin class name defined in this module file""" 

32 

33 

34class SqliteProducerPlugin(AbstractMessageBusProducerPlugin): 

35 """Sqlite message bus producer plugin, to allow Hermes-server to emit events 

36 to an sqlite database (useful for tests, not heavily tested for production use)""" 

37 

38 def __init__(self, settings: dict[str, Any]): 

39 """Instantiate new plugin and store a copy of its settings dict in 

40 self._settings""" 

41 super().__init__(settings) 

42 self._db: sqlite3.Connection | None = None 

43 

44 def open(self) -> Any: 

45 """Establish connection with messagebus""" 

46 self._db = sqlite3.connect(database=self._settings["uri"]) 

47 self.__initdb() 

48 self.__purgeOldEvents() 

49 

50 def close(self): 

51 """Close connection with messagebus""" 

52 if self._db: 

53 self._db.commit() 

54 self._db.close() 

55 del self._db 

56 self._db = None 

57 

58 def _send(self, event: Event): 

59 """Send specified event to message bus""" 

60 data = {"data": event.to_json(), "timestamp": datetime.now().timestamp()} 

61 sql = "INSERT INTO hermesmessages (data, timestamp) VALUES (:data, :timestamp)" 

62 self._db.execute(sql, data) 

63 self._db.commit() 

64 

65 def __initdb(self): 

66 """Create sqlite table when necessary""" 

67 sql = ( 

68 "CREATE TABLE IF NOT EXISTS hermesmessages (" 

69 "msgid INTEGER PRIMARY KEY AUTOINCREMENT, " 

70 "data TEXT NOT NULL, timestamp REAL NOT NULL)" 

71 ) 

72 self._db.execute(sql) 

73 self._db.commit() 

74 

75 def __purgeOldEvents(self): 

76 """Purge old events from sqlite database""" 

77 now = datetime.now() 

78 retention = timedelta(days=self._settings["retention_in_days"]) 

79 

80 data = {"purgelimit": (now - retention).timestamp()} 

81 

82 sql = "DELETE FROM hermesmessages WHERE timestamp < :purgelimit" 

83 self._db.execute(sql, data) 

84 self._db.commit() 

85 self._db.execute("VACUUM") 

86 self._db.commit()