Coverage for lib/plugins.py: 65%
92 statements
« prev ^ index » next coverage.py v7.10.1, created at 2025-07-28 07:24 +0000
« prev ^ index » next coverage.py v7.10.1, created at 2025-07-28 07:24 +0000
1#!/usr/bin/env python3
2# -*- coding: utf-8 -*-
4# Hermes : Change Data Capture (CDC) tool from any source(s) to any target
5# Copyright (C) 2023, 2024 INSA Strasbourg
6#
7# This file is part of Hermes.
8#
9# Hermes is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# Hermes is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with Hermes. If not, see <https://www.gnu.org/licenses/>.
23from typing import Any, Iterable
24from jinja2 import Undefined
26from lib.datamodel.event import Event
29class FailedToSendEventError(Exception):
30 """Raised when AbstractMessageBusProducerPlugin was unable to send event to
31 MessageBus"""
34class AbstractAttributePlugin:
35 """Superclass of attribute plugins (Jinja).
36 Those plugins can be used as Jinja filters in hermes-server.datamodel settings and
37 dynamically transform values
39 Settings can be provided in config file, and a cerberus
40 config-schema-plugin-PLUGINNAME.yml can be provided in the plugin dir, containing
41 the settings validation rules schemas in yaml format to allow Hermes to validate
42 plugin settings.
43 See https://docs.python-cerberus.org/validation-rules.html
44 """
46 def __init__(self, settings: dict[str, Any]):
47 """Instantiate new plugin and store a copy of its settings dict in
48 self._settings"""
49 self._settings = settings.copy()
51 def filter(self, value: Any | None | Undefined, *args: Any, **kwds: Any) -> Any:
52 """Call the plugin with specified value, and returns the result"""
53 raise NotImplementedError
56class AbstractDataSourcePlugin:
57 """Superclass of datasource plugins, to interface Hermes with database, ldap
58 directory, webservice or whatever you need
59 The connection is established in a with-statement context handled by this class that
60 will call open() and close() methods
62 Settings can be provided in config file, and a cerberus
63 config-schema-plugin-PLUGINNAME.yml can be provided in the plugin dir, containing
64 the settings validation rules schemas in yaml format to allow Hermes to validate
65 plugin settings.
66 See https://docs.python-cerberus.org/validation-rules.html
67 """
69 def __init__(self, settings: dict[str, Any]):
70 """Instantiate new plugin and store a copy of its settings dict in
71 self._settings"""
72 self._settings = settings.copy()
74 def __enter__(self) -> "AbstractDataSourcePlugin":
75 """Calls open() method when entering in a with statement"""
76 self.open()
77 return self
79 def __exit__(self, exc_type, exc_val, exc_tb) -> bool:
80 """Calls close() method when exiting of a with statement.
81 Handle any close() exception"""
82 try:
83 self.close()
84 except Exception as e:
85 __hermes__.logger.error(
86 f"Error when disconnecting from {self.__class__.__name__}: {str(e)}"
87 )
88 return False
90 # Return false if called because an exception occurred in context
91 if exc_type:
92 return False
93 return True
95 def open(self):
96 """Establish connection with datasource"""
97 raise NotImplementedError
99 def close(self):
100 """Close connection with datasource"""
101 raise NotImplementedError
103 def fetch(
104 self,
105 query: str | None,
106 vars: dict[str, Any],
107 ) -> list[dict[str, Any]]:
108 """Fetch data from datasource with specified query and optional queryvars.
109 Returns a list of dict containing each entry fetched, with REMOTE_ATTRIBUTES
110 as keys, and corresponding fetched values as values"""
111 raise NotImplementedError
113 def add(self, query: str | None, vars: dict[str, Any]):
114 """Add data to datasource with specified query and optional queryvars"""
115 raise NotImplementedError
117 def delete(self, query: str | None, vars: dict[str, Any]):
118 """Delete data from datasource with specified query and optional queryvars"""
119 raise NotImplementedError
121 def modify(self, query: str | None, vars: dict[str, Any]):
122 """Modify data on datasource with specified query and optional queryvars"""
123 raise NotImplementedError
126class AbstractMessageBusProducerPlugin:
127 """Superclass of message bus producers plugins, to allow Hermes-server to emit
128 events to message bus.
129 The connection to the bus is established in a with-statement context handled by this
130 class that will call open() and close() methods.
131 The subclasses must defines _open() method, and not override open() method that
132 exists only to call _open() and handle errors.
134 Settings can be provided in config file, and a cerberus
135 config-schema-plugin-PLUGINNAME.yml can be provided in the plugin dir, containing
136 the settings validation rules schemas in yaml format to allow Hermes to validate
137 plugin settings.
138 See https://docs.python-cerberus.org/validation-rules.html
139 """
141 def __init__(self, settings: dict[str, Any]):
142 """Instantiate new plugin and store a copy of its settings dict in
143 self._settings"""
144 self._settings = settings.copy()
146 def __enter__(self) -> "AbstractMessageBusProducerPlugin":
147 """Calls open() method when entering in a with statement"""
148 self.open()
149 return self
151 def __exit__(self, exc_type, exc_val, exc_tb) -> bool:
152 """Calls close() method when exiting of a with statement.
153 Handle any close() exception"""
154 try:
155 self.close()
156 except Exception as e:
157 __hermes__.logger.error(
158 f"Error when disconnecting from {self.__class__.__name__}: {str(e)}"
159 )
160 return False
162 # Return false if called because an exception occurred in context
163 if exc_type:
164 return False
165 return True
167 def open(self) -> Any:
168 """Establish connection with messagebus"""
169 raise NotImplementedError
171 def close(self):
172 """Close connection with messagebus"""
173 raise NotImplementedError
175 def _send(self, event: Event):
176 """Send specified event to message bus"""
177 raise NotImplementedError
179 def send(self, event: Event):
180 """Call _send() with specified event to message bus, and handle errors"""
181 try:
182 self._send(event)
183 except Exception as e:
184 __hermes__.logger.critical(f"Failed to send event: {str(e)}")
185 raise FailedToSendEventError(str(e)) from None
188class AbstractMessageBusConsumerPlugin:
189 """Superclass of message bus consumers plugins, to allow Hermes-clients to fetch
190 events from message bus.
191 The connection to the bus is established in a with-statement context handled by this
192 class that will call open() and close() methods.
194 Settings can be provided in config file, and a cerberus
195 config-schema-plugin-PLUGINNAME.yml can be provided in the plugin dir, containing
196 the settings validation rules schemas in yaml format to allow Hermes to validate
197 plugin settings.
198 See https://docs.python-cerberus.org/validation-rules.html
199 """
201 def __init__(self, settings: dict[str, Any]):
202 """Instantiate new plugin and store a copy of its settings dict in
203 self._settings"""
204 self._settings = settings.copy()
206 def __enter__(self) -> "AbstractMessageBusConsumerPlugin":
207 """Calls open() method when entering in a with statement"""
208 self.open()
209 return self
211 def __exit__(self, exc_type, exc_val, exc_tb) -> bool:
212 """Calls close() method when exiting of a with statement.
213 Handle any close() exception"""
214 try:
215 self.close()
216 except Exception as e:
217 __hermes__.logger.error(
218 f"Error when disconnecting from {self.__class__.__name__}: {str(e)}"
219 )
220 return False
222 # Return false if called because an exception occurred in context
223 if exc_type:
224 return False
225 return True
227 def open(self) -> Any:
228 """Establish connection with messagebus"""
229 raise NotImplementedError
231 def close(self):
232 """Close connection with messagebus"""
233 raise NotImplementedError
235 def seekToBeginning(self):
236 """Seek to first (older) event in message bus queue"""
237 raise NotImplementedError
239 def seek(self, offset: Any):
240 """Seek to specified offset event in message bus queue"""
241 raise NotImplementedError
243 def setTimeout(self, timeout_ms: int | None):
244 """Set timeout (in milliseconds) before aborting when waiting for next event.
245 If None, wait forever"""
246 raise NotImplementedError
248 def findNextEventOfCategory(self, category: str) -> Event | None:
249 """Lookup for first message with specified category and returns it,
250 or returns None if none was found"""
251 raise NotImplementedError
253 def __iter__(self) -> Iterable:
254 """Iterate over message bus returning each Event, starting at current offset.
255 When every event has been consumed, wait for next message until timeout set with
256 setTimeout() has been reached"""
257 raise NotImplementedError