Coverage for plugins/messagebus_consumers/sqlite/sqlite.py: 70%
97 statements
« prev ^ index » next coverage.py v7.10.1, created at 2025-07-28 07:25 +0000
« prev ^ index » next coverage.py v7.10.1, created at 2025-07-28 07:25 +0000
1#!/usr/bin/env python3
2# -*- coding: utf-8 -*-
4# Hermes : Change Data Capture (CDC) tool from any source(s) to any target
5# Copyright (C) 2023, 2024 INSA Strasbourg
6#
7# This file is part of Hermes.
8#
9# Hermes is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# Hermes is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with Hermes. If not, see <https://www.gnu.org/licenses/>.
23from typing import Any, Iterable
25from lib.plugins import AbstractMessageBusConsumerPlugin
26from lib.datamodel.event import Event
28from datetime import datetime, timedelta
29import sqlite3
30import time
32HERMES_PLUGIN_CLASSNAME: str | None = "SqliteConsumerPlugin"
33"""The plugin class name defined in this module file"""
36class SqliteConsumerPlugin(AbstractMessageBusConsumerPlugin):
37 """Sqlite message bus consumer plugin, to allow Hermes-clients to fetch events
38 from an sqlite database (useful for tests, not heavily tested for production use).
39 """
41 def __init__(self, settings: dict[str, Any]):
42 """Instantiate new plugin and store a copy of its settings dict in
43 self._settings"""
44 super().__init__(settings)
45 self._db: sqlite3.Connection | None = None
46 self.__curoffset = None
47 self._timeout: int | None = 1
49 def open(self) -> Any:
50 """Establish connection with messagebus"""
51 database = f"file:{self._settings['uri']}?mode=ro"
52 try:
53 self._db = sqlite3.connect(database=database, uri=True)
54 except sqlite3.OperationalError:
55 # No db file found
56 __hermes__.logger.info(
57 f"Sqlite bus file '{self._settings['uri']}' doesn't exist yet"
58 )
59 else:
60 self._db.row_factory = sqlite3.Row
62 def close(self):
63 """Close connection with messagebus"""
64 if self._db:
65 self._db.close()
66 del self._db
67 self._db = None
69 def seekToBeginning(self):
70 """Seek to first (older) event in message bus queue"""
71 if not self._db:
72 self.open()
73 if not self._db:
74 self.__curoffset = -1
75 return
77 sql = "SELECT MIN(msgid) AS msgid FROM hermesmessages"
78 cur = self._db.execute(sql)
80 entry = cur.fetchone()
81 if entry is None:
82 self.__curoffset = -1
83 return
84 self.__curoffset = entry["msgid"]
86 def seek(self, offset: Any):
87 """Seek to specified offset event in message bus queue"""
88 if not self._db:
89 self.open()
90 if not self._db:
91 raise IndexError(
92 f"Specified offset '{offset}' doesn't exists in bus (bus is empty)"
93 ) from None
95 sql = (
96 "SELECT "
97 " min(hermesmessages.msgid) AS minmsgid, "
98 " max(hermesmessages.msgid) AS maxmsgid, "
99 " max(sqlite_sequence.seq)+1 AS nextmsgid "
100 "FROM hermesmessages, sqlite_sequence "
101 "WHERE sqlite_sequence.name = 'hermesmessages'"
102 )
103 cur = self._db.execute(sql)
104 entry = cur.fetchone()
105 if entry is None:
106 raise IOError("Bus database seems invalid") from None
107 else:
108 if (
109 entry["minmsgid"] is None
110 or entry["maxmsgid"] is None
111 or entry["nextmsgid"] is None
112 ):
113 # The bus seems empty, determine limits by autoincrement sequence number
114 sql = (
115 "SELECT "
116 " seq+1 AS minmsgid, "
117 " seq+1 AS maxmsgid, "
118 " seq+1 AS nextmsgid "
119 "FROM sqlite_sequence "
120 "WHERE name = 'hermesmessages'"
121 )
122 cur = self._db.execute(sql)
123 entry = cur.fetchone()
124 if entry is None:
125 raise IOError("Bus database seems invalid") from None
127 if entry["minmsgid"] <= offset <= entry["nextmsgid"]:
128 self.__curoffset = offset
129 else:
130 raise IndexError(
131 f"Specified offset '{offset}' doesn't exists in bus"
132 ) from None
134 def setTimeout(self, timeout_ms: int | None):
135 """Set timeout (in milliseconds) before aborting when waiting for next event.
136 If None, wait forever"""
137 if timeout_ms is None:
138 self._timeout = None
139 else:
140 self._timeout = timeout_ms / 1000
142 def findNextEventOfCategory(self, category: str) -> Event | None:
143 """Lookup for first message with specified category and returns it,
144 or returns None if none was found"""
145 event: Event
146 for event in self:
147 if event.evcategory == category:
148 return event
150 return None # Not found
152 def __iter__(self) -> Iterable:
153 """Iterate over message bus returning each Event, starting at current offset.
154 When every event has been consumed, wait for next message until timeout set with
155 setTimeout() has been reached"""
156 try:
157 if self._timeout:
158 nexttimeout = datetime.now() + timedelta(seconds=self._timeout)
159 else:
160 nexttimeout = datetime.now() + timedelta(days=9999999) # Infinite
162 while datetime.now() < nexttimeout:
163 if not self._db:
164 self.open()
165 if not self._db:
166 # No database, so no data: wait and retry until data or timeout
167 time.sleep(0.5)
168 continue
170 sql = "SELECT * FROM hermesmessages WHERE msgid>=:offset ORDER BY msgid"
171 cur = self._db.execute(sql, {"offset": self.__curoffset})
173 entries = cur.fetchall()
174 if len(entries) == 0:
175 # No data, wait and retry until data or timeout
176 time.sleep(0.5)
177 continue
179 for entry in entries:
180 yield self.__entryToEvent(entry)
181 self.__curoffset = entry["msgid"] + 1
182 if self._timeout:
183 nexttimeout = datetime.now() + timedelta(seconds=self._timeout)
184 except Exception:
185 raise
187 @classmethod
188 def __entryToEvent(cls, entry) -> Event:
189 """Convert sql entry to Event and returns it"""
190 event = Event.from_json(entry["data"])
191 event.offset = entry["msgid"]
192 event.timestamp = datetime.fromtimestamp(entry["timestamp"])
193 return event