Coverage for lib/plugins.py: 65%

92 statements  

« prev     ^ index     » next       coverage.py v7.10.1, created at 2025-07-28 07:24 +0000

1#!/usr/bin/env python3 

2# -*- coding: utf-8 -*- 

3 

4# Hermes : Change Data Capture (CDC) tool from any source(s) to any target 

5# Copyright (C) 2023, 2024 INSA Strasbourg 

6# 

7# This file is part of Hermes. 

8# 

9# Hermes is free software: you can redistribute it and/or modify 

10# it under the terms of the GNU General Public License as published by 

11# the Free Software Foundation, either version 3 of the License, or 

12# (at your option) any later version. 

13# 

14# Hermes is distributed in the hope that it will be useful, 

15# but WITHOUT ANY WARRANTY; without even the implied warranty of 

16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

17# GNU General Public License for more details. 

18# 

19# You should have received a copy of the GNU General Public License 

20# along with Hermes. If not, see <https://www.gnu.org/licenses/>. 

21 

22 

23from typing import Any, Iterable 

24from jinja2 import Undefined 

25 

26from lib.datamodel.event import Event 

27 

28 

29class FailedToSendEventError(Exception): 

30 """Raised when AbstractMessageBusProducerPlugin was unable to send event to 

31 MessageBus""" 

32 

33 

34class AbstractAttributePlugin: 

35 """Superclass of attribute plugins (Jinja). 

36 Those plugins can be used as Jinja filters in hermes-server.datamodel settings and 

37 dynamically transform values 

38 

39 Settings can be provided in config file, and a cerberus 

40 config-schema-plugin-PLUGINNAME.yml can be provided in the plugin dir, containing 

41 the settings validation rules schemas in yaml format to allow Hermes to validate 

42 plugin settings. 

43 See https://docs.python-cerberus.org/validation-rules.html 

44 """ 

45 

46 def __init__(self, settings: dict[str, Any]): 

47 """Instantiate new plugin and store a copy of its settings dict in 

48 self._settings""" 

49 self._settings = settings.copy() 

50 

51 def filter(self, value: Any | None | Undefined, *args: Any, **kwds: Any) -> Any: 

52 """Call the plugin with specified value, and returns the result""" 

53 raise NotImplementedError 

54 

55 

56class AbstractDataSourcePlugin: 

57 """Superclass of datasource plugins, to interface Hermes with database, ldap 

58 directory, webservice or whatever you need 

59 The connection is established in a with-statement context handled by this class that 

60 will call open() and close() methods 

61 

62 Settings can be provided in config file, and a cerberus 

63 config-schema-plugin-PLUGINNAME.yml can be provided in the plugin dir, containing 

64 the settings validation rules schemas in yaml format to allow Hermes to validate 

65 plugin settings. 

66 See https://docs.python-cerberus.org/validation-rules.html 

67 """ 

68 

69 def __init__(self, settings: dict[str, Any]): 

70 """Instantiate new plugin and store a copy of its settings dict in 

71 self._settings""" 

72 self._settings = settings.copy() 

73 

74 def __enter__(self) -> "AbstractDataSourcePlugin": 

75 """Calls open() method when entering in a with statement""" 

76 self.open() 

77 return self 

78 

79 def __exit__(self, exc_type, exc_val, exc_tb) -> bool: 

80 """Calls close() method when exiting of a with statement. 

81 Handle any close() exception""" 

82 try: 

83 self.close() 

84 except Exception as e: 

85 __hermes__.logger.error( 

86 f"Error when disconnecting from {self.__class__.__name__}: {str(e)}" 

87 ) 

88 return False 

89 

90 # Return false if called because an exception occurred in context 

91 if exc_type: 

92 return False 

93 return True 

94 

95 def open(self): 

96 """Establish connection with datasource""" 

97 raise NotImplementedError 

98 

99 def close(self): 

100 """Close connection with datasource""" 

101 raise NotImplementedError 

102 

103 def fetch( 

104 self, 

105 query: str | None, 

106 vars: dict[str, Any], 

107 ) -> list[dict[str, Any]]: 

108 """Fetch data from datasource with specified query and optional queryvars. 

109 Returns a list of dict containing each entry fetched, with REMOTE_ATTRIBUTES 

110 as keys, and corresponding fetched values as values""" 

111 raise NotImplementedError 

112 

113 def add(self, query: str | None, vars: dict[str, Any]): 

114 """Add data to datasource with specified query and optional queryvars""" 

115 raise NotImplementedError 

116 

117 def delete(self, query: str | None, vars: dict[str, Any]): 

118 """Delete data from datasource with specified query and optional queryvars""" 

119 raise NotImplementedError 

120 

121 def modify(self, query: str | None, vars: dict[str, Any]): 

122 """Modify data on datasource with specified query and optional queryvars""" 

123 raise NotImplementedError 

124 

125 

126class AbstractMessageBusProducerPlugin: 

127 """Superclass of message bus producers plugins, to allow Hermes-server to emit 

128 events to message bus. 

129 The connection to the bus is established in a with-statement context handled by this 

130 class that will call open() and close() methods. 

131 The subclasses must defines _open() method, and not override open() method that 

132 exists only to call _open() and handle errors. 

133 

134 Settings can be provided in config file, and a cerberus 

135 config-schema-plugin-PLUGINNAME.yml can be provided in the plugin dir, containing 

136 the settings validation rules schemas in yaml format to allow Hermes to validate 

137 plugin settings. 

138 See https://docs.python-cerberus.org/validation-rules.html 

139 """ 

140 

141 def __init__(self, settings: dict[str, Any]): 

142 """Instantiate new plugin and store a copy of its settings dict in 

143 self._settings""" 

144 self._settings = settings.copy() 

145 

146 def __enter__(self) -> "AbstractMessageBusProducerPlugin": 

147 """Calls open() method when entering in a with statement""" 

148 self.open() 

149 return self 

150 

151 def __exit__(self, exc_type, exc_val, exc_tb) -> bool: 

152 """Calls close() method when exiting of a with statement. 

153 Handle any close() exception""" 

154 try: 

155 self.close() 

156 except Exception as e: 

157 __hermes__.logger.error( 

158 f"Error when disconnecting from {self.__class__.__name__}: {str(e)}" 

159 ) 

160 return False 

161 

162 # Return false if called because an exception occurred in context 

163 if exc_type: 

164 return False 

165 return True 

166 

167 def open(self) -> Any: 

168 """Establish connection with messagebus""" 

169 raise NotImplementedError 

170 

171 def close(self): 

172 """Close connection with messagebus""" 

173 raise NotImplementedError 

174 

175 def _send(self, event: Event): 

176 """Send specified event to message bus""" 

177 raise NotImplementedError 

178 

179 def send(self, event: Event): 

180 """Call _send() with specified event to message bus, and handle errors""" 

181 try: 

182 self._send(event) 

183 except Exception as e: 

184 __hermes__.logger.critical(f"Failed to send event: {str(e)}") 

185 raise FailedToSendEventError(str(e)) from None 

186 

187 

188class AbstractMessageBusConsumerPlugin: 

189 """Superclass of message bus consumers plugins, to allow Hermes-clients to fetch 

190 events from message bus. 

191 The connection to the bus is established in a with-statement context handled by this 

192 class that will call open() and close() methods. 

193 

194 Settings can be provided in config file, and a cerberus 

195 config-schema-plugin-PLUGINNAME.yml can be provided in the plugin dir, containing 

196 the settings validation rules schemas in yaml format to allow Hermes to validate 

197 plugin settings. 

198 See https://docs.python-cerberus.org/validation-rules.html 

199 """ 

200 

201 def __init__(self, settings: dict[str, Any]): 

202 """Instantiate new plugin and store a copy of its settings dict in 

203 self._settings""" 

204 self._settings = settings.copy() 

205 

206 def __enter__(self) -> "AbstractMessageBusConsumerPlugin": 

207 """Calls open() method when entering in a with statement""" 

208 self.open() 

209 return self 

210 

211 def __exit__(self, exc_type, exc_val, exc_tb) -> bool: 

212 """Calls close() method when exiting of a with statement. 

213 Handle any close() exception""" 

214 try: 

215 self.close() 

216 except Exception as e: 

217 __hermes__.logger.error( 

218 f"Error when disconnecting from {self.__class__.__name__}: {str(e)}" 

219 ) 

220 return False 

221 

222 # Return false if called because an exception occurred in context 

223 if exc_type: 

224 return False 

225 return True 

226 

227 def open(self) -> Any: 

228 """Establish connection with messagebus""" 

229 raise NotImplementedError 

230 

231 def close(self): 

232 """Close connection with messagebus""" 

233 raise NotImplementedError 

234 

235 def seekToBeginning(self): 

236 """Seek to first (older) event in message bus queue""" 

237 raise NotImplementedError 

238 

239 def seek(self, offset: Any): 

240 """Seek to specified offset event in message bus queue""" 

241 raise NotImplementedError 

242 

243 def setTimeout(self, timeout_ms: int | None): 

244 """Set timeout (in milliseconds) before aborting when waiting for next event. 

245 If None, wait forever""" 

246 raise NotImplementedError 

247 

248 def findNextEventOfCategory(self, category: str) -> Event | None: 

249 """Lookup for first message with specified category and returns it, 

250 or returns None if none was found""" 

251 raise NotImplementedError 

252 

253 def __iter__(self) -> Iterable: 

254 """Iterate over message bus returning each Event, starting at current offset. 

255 When every event has been consumed, wait for next message until timeout set with 

256 setTimeout() has been reached""" 

257 raise NotImplementedError