Coverage for plugins/messagebus_producers/sqlite/sqlite.py: 100%
39 statements
« prev ^ index » next coverage.py v7.10.1, created at 2025-07-28 07:24 +0000
« prev ^ index » next coverage.py v7.10.1, created at 2025-07-28 07:24 +0000
1#!/usr/bin/env python3
2# -*- coding: utf-8 -*-
4# Hermes : Change Data Capture (CDC) tool from any source(s) to any target
5# Copyright (C) 2023, 2024 INSA Strasbourg
6#
7# This file is part of Hermes.
8#
9# Hermes is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# Hermes is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with Hermes. If not, see <https://www.gnu.org/licenses/>.
23from lib.plugins import AbstractMessageBusProducerPlugin
24from lib.datamodel.event import Event
26from datetime import datetime, timedelta
27from typing import Any
28import sqlite3
30HERMES_PLUGIN_CLASSNAME: str | None = "SqliteProducerPlugin"
31"""The plugin class name defined in this module file"""
34class SqliteProducerPlugin(AbstractMessageBusProducerPlugin):
35 """Sqlite message bus producer plugin, to allow Hermes-server to emit events
36 to an sqlite database (useful for tests, not heavily tested for production use)"""
38 def __init__(self, settings: dict[str, Any]):
39 """Instantiate new plugin and store a copy of its settings dict in
40 self._settings"""
41 super().__init__(settings)
42 self._db: sqlite3.Connection | None = None
44 def open(self) -> Any:
45 """Establish connection with messagebus"""
46 self._db = sqlite3.connect(database=self._settings["uri"])
47 self.__initdb()
48 self.__purgeOldEvents()
50 def close(self):
51 """Close connection with messagebus"""
52 if self._db:
53 self._db.commit()
54 self._db.close()
55 del self._db
56 self._db = None
58 def _send(self, event: Event):
59 """Send specified event to message bus"""
60 data = {"data": event.to_json(), "timestamp": datetime.now().timestamp()}
61 sql = "INSERT INTO hermesmessages (data, timestamp) VALUES (:data, :timestamp)"
62 self._db.execute(sql, data)
63 self._db.commit()
65 def __initdb(self):
66 """Create sqlite table when necessary"""
67 sql = (
68 "CREATE TABLE IF NOT EXISTS hermesmessages ("
69 "msgid INTEGER PRIMARY KEY AUTOINCREMENT, "
70 "data TEXT NOT NULL, timestamp REAL NOT NULL)"
71 )
72 self._db.execute(sql)
73 self._db.commit()
75 def __purgeOldEvents(self):
76 """Purge old events from sqlite database"""
77 now = datetime.now()
78 retention = timedelta(days=self._settings["retention_in_days"])
80 data = {"purgelimit": (now - retention).timestamp()}
82 sql = "DELETE FROM hermesmessages WHERE timestamp < :purgelimit"
83 self._db.execute(sql, data)
84 self._db.commit()
85 self._db.execute("VACUUM")
86 self._db.commit()