Coverage for plugins/messagebus_consumers/sqlite/sqlite.py: 70%

97 statements  

« prev     ^ index     » next       coverage.py v7.10.1, created at 2025-07-28 07:25 +0000

1#!/usr/bin/env python3 

2# -*- coding: utf-8 -*- 

3 

4# Hermes : Change Data Capture (CDC) tool from any source(s) to any target 

5# Copyright (C) 2023, 2024 INSA Strasbourg 

6# 

7# This file is part of Hermes. 

8# 

9# Hermes is free software: you can redistribute it and/or modify 

10# it under the terms of the GNU General Public License as published by 

11# the Free Software Foundation, either version 3 of the License, or 

12# (at your option) any later version. 

13# 

14# Hermes is distributed in the hope that it will be useful, 

15# but WITHOUT ANY WARRANTY; without even the implied warranty of 

16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

17# GNU General Public License for more details. 

18# 

19# You should have received a copy of the GNU General Public License 

20# along with Hermes. If not, see <https://www.gnu.org/licenses/>. 

21 

22 

23from typing import Any, Iterable 

24 

25from lib.plugins import AbstractMessageBusConsumerPlugin 

26from lib.datamodel.event import Event 

27 

28from datetime import datetime, timedelta 

29import sqlite3 

30import time 

31 

32HERMES_PLUGIN_CLASSNAME: str | None = "SqliteConsumerPlugin" 

33"""The plugin class name defined in this module file""" 

34 

35 

36class SqliteConsumerPlugin(AbstractMessageBusConsumerPlugin): 

37 """Sqlite message bus consumer plugin, to allow Hermes-clients to fetch events 

38 from an sqlite database (useful for tests, not heavily tested for production use). 

39 """ 

40 

41 def __init__(self, settings: dict[str, Any]): 

42 """Instantiate new plugin and store a copy of its settings dict in 

43 self._settings""" 

44 super().__init__(settings) 

45 self._db: sqlite3.Connection | None = None 

46 self.__curoffset = None 

47 self._timeout: int | None = 1 

48 

49 def open(self) -> Any: 

50 """Establish connection with messagebus""" 

51 database = f"file:{self._settings['uri']}?mode=ro" 

52 try: 

53 self._db = sqlite3.connect(database=database, uri=True) 

54 except sqlite3.OperationalError: 

55 # No db file found 

56 __hermes__.logger.info( 

57 f"Sqlite bus file '{self._settings['uri']}' doesn't exist yet" 

58 ) 

59 else: 

60 self._db.row_factory = sqlite3.Row 

61 

62 def close(self): 

63 """Close connection with messagebus""" 

64 if self._db: 

65 self._db.close() 

66 del self._db 

67 self._db = None 

68 

69 def seekToBeginning(self): 

70 """Seek to first (older) event in message bus queue""" 

71 if not self._db: 

72 self.open() 

73 if not self._db: 

74 self.__curoffset = -1 

75 return 

76 

77 sql = "SELECT MIN(msgid) AS msgid FROM hermesmessages" 

78 cur = self._db.execute(sql) 

79 

80 entry = cur.fetchone() 

81 if entry is None: 

82 self.__curoffset = -1 

83 return 

84 self.__curoffset = entry["msgid"] 

85 

86 def seek(self, offset: Any): 

87 """Seek to specified offset event in message bus queue""" 

88 if not self._db: 

89 self.open() 

90 if not self._db: 

91 raise IndexError( 

92 f"Specified offset '{offset}' doesn't exists in bus (bus is empty)" 

93 ) from None 

94 

95 sql = ( 

96 "SELECT " 

97 " min(hermesmessages.msgid) AS minmsgid, " 

98 " max(hermesmessages.msgid) AS maxmsgid, " 

99 " max(sqlite_sequence.seq)+1 AS nextmsgid " 

100 "FROM hermesmessages, sqlite_sequence " 

101 "WHERE sqlite_sequence.name = 'hermesmessages'" 

102 ) 

103 cur = self._db.execute(sql) 

104 entry = cur.fetchone() 

105 if entry is None: 

106 raise IOError("Bus database seems invalid") from None 

107 else: 

108 if ( 

109 entry["minmsgid"] is None 

110 or entry["maxmsgid"] is None 

111 or entry["nextmsgid"] is None 

112 ): 

113 # The bus seems empty, determine limits by autoincrement sequence number 

114 sql = ( 

115 "SELECT " 

116 " seq+1 AS minmsgid, " 

117 " seq+1 AS maxmsgid, " 

118 " seq+1 AS nextmsgid " 

119 "FROM sqlite_sequence " 

120 "WHERE name = 'hermesmessages'" 

121 ) 

122 cur = self._db.execute(sql) 

123 entry = cur.fetchone() 

124 if entry is None: 

125 raise IOError("Bus database seems invalid") from None 

126 

127 if entry["minmsgid"] <= offset <= entry["nextmsgid"]: 

128 self.__curoffset = offset 

129 else: 

130 raise IndexError( 

131 f"Specified offset '{offset}' doesn't exists in bus" 

132 ) from None 

133 

134 def setTimeout(self, timeout_ms: int | None): 

135 """Set timeout (in milliseconds) before aborting when waiting for next event. 

136 If None, wait forever""" 

137 if timeout_ms is None: 

138 self._timeout = None 

139 else: 

140 self._timeout = timeout_ms / 1000 

141 

142 def findNextEventOfCategory(self, category: str) -> Event | None: 

143 """Lookup for first message with specified category and returns it, 

144 or returns None if none was found""" 

145 event: Event 

146 for event in self: 

147 if event.evcategory == category: 

148 return event 

149 

150 return None # Not found 

151 

152 def __iter__(self) -> Iterable: 

153 """Iterate over message bus returning each Event, starting at current offset. 

154 When every event has been consumed, wait for next message until timeout set with 

155 setTimeout() has been reached""" 

156 try: 

157 if self._timeout: 

158 nexttimeout = datetime.now() + timedelta(seconds=self._timeout) 

159 else: 

160 nexttimeout = datetime.now() + timedelta(days=9999999) # Infinite 

161 

162 while datetime.now() < nexttimeout: 

163 if not self._db: 

164 self.open() 

165 if not self._db: 

166 # No database, so no data: wait and retry until data or timeout 

167 time.sleep(0.5) 

168 continue 

169 

170 sql = "SELECT * FROM hermesmessages WHERE msgid>=:offset ORDER BY msgid" 

171 cur = self._db.execute(sql, {"offset": self.__curoffset}) 

172 

173 entries = cur.fetchall() 

174 if len(entries) == 0: 

175 # No data, wait and retry until data or timeout 

176 time.sleep(0.5) 

177 continue 

178 

179 for entry in entries: 

180 yield self.__entryToEvent(entry) 

181 self.__curoffset = entry["msgid"] + 1 

182 if self._timeout: 

183 nexttimeout = datetime.now() + timedelta(seconds=self._timeout) 

184 except Exception: 

185 raise 

186 

187 @classmethod 

188 def __entryToEvent(cls, entry) -> Event: 

189 """Convert sql entry to Event and returns it""" 

190 event = Event.from_json(entry["data"]) 

191 event.offset = entry["msgid"] 

192 event.timestamp = datetime.fromtimestamp(entry["timestamp"]) 

193 return event