Coverage for lib/config/__init__.py: 90%
188 statements
« prev ^ index » next coverage.py v7.10.1, created at 2025-07-28 07:25 +0000
« prev ^ index » next coverage.py v7.10.1, created at 2025-07-28 07:25 +0000
1#!/usr/bin/python3
2# -*- coding: utf-8 -*-
4# Hermes : Change Data Capture (CDC) tool from any source(s) to any target
5# Copyright (C) 2023, 2024 INSA Strasbourg
6#
7# This file is part of Hermes.
8#
9# Hermes is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# Hermes is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with Hermes. If not, see <https://www.gnu.org/licenses/>.
23from typing import Any, Callable, Hashable, Iterable
24from types import FrameType
26from cerberus import Validator
27from copy import deepcopy
28import argparse
29import importlib
30import os.path
31import sys
32import signal
33import threading
34import yaml
35from lib.datamodel.event import Event
36from lib.datamodel.serialization import LocalCache
37from lib.plugins import (
38 AbstractAttributePlugin,
39 AbstractDataSourcePlugin,
40 AbstractMessageBusConsumerPlugin,
41 AbstractMessageBusProducerPlugin,
42)
43from lib.utils.singleton import SingleInstance, SingleInstanceException
45import lib.utils.logging
48class HermesConfigError(Exception):
49 """Raised when the config file doesn't validate the config schema"""
52class HermesInvalidAppname(Exception):
53 """Raised when the appname specified on launch doesn't respect app naming scheme"""
56class HermesInvalidConfigSchemaKey(Exception):
57 """Raised when one config schema is defining another key than its name"""
60class HermesPluginNotFoundError(Exception):
61 """Raised when specified plugin is not found"""
64class HermesPluginError(Exception):
65 """Raised when specified plugin exists but cannot be imported"""
68class HermesPluginClassNotFoundError(Exception):
69 """Raised when declared plugin class cannot be found"""
72class YAMLUniqueKeyCSafeLoader(yaml.CSafeLoader):
73 """Override yaml load to raise an error when some duplicated keys are found
74 Tweak found on
75 https://gist.github.com/pypt/94d747fe5180851196eb?permalink_comment_id=4015118#gistcomment-4015118
76 """
78 def construct_mapping(self, node, deep=False) -> dict[Hashable, Any]:
79 mapping = set()
80 for key_node, value_node in node.value:
81 key = self.construct_object(key_node, deep=deep)
82 if key in mapping:
83 raise HermesConfigError(f"Duplicate key '{key}' found in config")
84 mapping.add(key)
85 return super().construct_mapping(node, deep)
88class HermesConfig(LocalCache):
89 """Load, validate config from config file, and expose config dict
91 config always contains the following keys:
92 - appname: the current app name (hermes-server, hermes-client-ldap, ...)
93 - hermes: hermes global config, for servers and clients
95 For server, it will contains too:
96 - hermes-server: config for server
98 For clients, it will contains too:
99 - hermes-client: global config for client, for options defined in GenericClient
100 (e.g. trashbinRetentionInDays)
101 - hermes-client-CLIENTNAME: specific config for client.
102 e.g. LDAP connection settings for hermes-client-ldap
104 The instance contains the method setSignalsHandler() that can be called to define a
105 handler for SIGINT and SIGTERM
106 """
108 def __init__(
109 self,
110 autoload: bool = True,
111 from_json_dict: None | dict[str, Any] = None,
112 allowMultipleInstances: bool = False,
113 ):
114 """Setup a config instance, and call load() if autoload is True"""
115 self._config: dict[str, Any] = {}
116 """ Configuration dictionary """
117 self._rawconfig: dict[str, Any] = {}
118 """ Raw configuration dictionary (merges config files, without plugins
119 instances) """
120 self._allowMultipleInstances: bool = allowMultipleInstances
121 """Indicate if we must abort if another instance is already running"""
123 if from_json_dict is not None:
124 self._rawconfig = from_json_dict
125 self._config = deepcopy(self._rawconfig)
126 super().__init__(jsondataattr="_rawconfig", cachefilename="_hermesconfig")
127 if self.hasData():
128 self._loadDatasourcesPlugins()
129 self._loadMessageBusPlugins()
130 self._loadAttributesPlugins()
131 else:
132 if autoload:
133 self.load()
135 def savecachefile(self, cacheFilename: str | None = None):
136 """Override method only to disable backup files in cache"""
137 return super().savecachefile(cacheFilename, dontKeepBackup=True)
139 def load(self, loadplugins: bool = True, isCLI: bool = False):
140 """Load and validate config of current appname, and fill config dictionary.
141 Setup logging, and signals handlers.
142 Load plugins, and validate their config.
143 """
144 self._config = {}
145 self._setAppname()
146 schemas = self._getRequiredSchemas(isCLI=isCLI)
147 schema = self._mergeSchemas(schemas)
149 config = None
150 if isCLI:
151 try:
152 with open(f"""{self._config["appname"]}-cli-config.yml""") as f:
153 config = yaml.load(f, Loader=YAMLUniqueKeyCSafeLoader)
154 except Exception:
155 pass
157 if config is None:
158 # Not CLI or cli-config not found
159 with open(f"""{self._config["appname"]}-config.yml""") as f:
160 config = yaml.load(f, Loader=YAMLUniqueKeyCSafeLoader)
162 validator = Validator(schema, purge_unknown=isCLI)
163 if not validator.validate(config):
164 raise HermesConfigError(validator.errors)
166 self._config |= validator.normalized(config)
167 self._rawconfig = deepcopy(self._config)
169 if not isCLI:
170 os.umask(self._config["hermes"]["umask"])
171 lib.utils.logging.setup_logger(self) # Setup logging
172 LocalCache.setup(self) # Update cache files settings
173 Event.LONG_STRING_LIMIT = self._config["hermes"]["logs"][
174 "long_string_limit"
175 ] # Set LONG_STRING_LIMIT according to config
177 super().__init__(
178 jsondataattr="_rawconfig",
179 cachefilename="_hermesconfig",
180 dontManageCacheDir=isCLI,
181 )
183 if not self._allowMultipleInstances:
184 # Ensure no other instance is already running
185 try:
186 self.__me = SingleInstance(self._config["appname"])
187 except SingleInstanceException:
188 sys.exit(1)
190 # Load plugins
191 if loadplugins:
192 self._loadDatasourcesPlugins()
193 self._loadMessageBusPlugins()
194 self._loadAttributesPlugins()
196 def setSignalsHandler(self, handler: Callable[[int, FrameType | None], None]):
197 """Defines a handler that will intercept thoses signals: SIGINT, SIGTERM
199 The handler prototype is:
200 handler(signalnumber: int, frame: FrameType | None) -> None
201 See https://docs.python.org/3/library/signal.html#signal.signal
202 """
203 if threading.current_thread() is not threading.main_thread():
204 # Do nothing if ran in a sub thread
205 # (should happen only from from functional tests)
206 return
207 for signum in [signal.SIGINT, signal.SIGTERM]:
208 signal.signal(signum, handler)
210 def __getitem__(self, key: Any) -> Any:
211 """Returns config value of specified config key"""
212 return self._config[key]
214 def __setitem__(self, key: Any, value: Any):
215 """Set specified config value of specified config key"""
216 self._config[key] = value
218 def __delitem__(self, key: Any):
219 """Delete config value of specified config key"""
220 del self._config[key]
222 def __iter__(self) -> Iterable:
223 """Iterate over top level config keys"""
224 return iter(self._config)
226 def __len__(self) -> int:
227 """Returns number of items at top level of config"""
228 return len(self._config)
230 def hasData(self) -> bool:
231 """Allow to test if current config contains minimal entries"""
232 return "appname" in self
234 def _setAppname(self):
235 """Determine and store appname in config dict on first arg passed to hermes.py
237 Apps MUST respect the following naming scheme:
238 - server for server
239 - client-CLIENTNAME for clients
241 The computed appname will be prefixed by 'hermes-'
242 """
243 parser = argparse.ArgumentParser(description="Hermes launcher", add_help=False)
244 parser.add_argument(
245 "appname",
246 help="The Hermes application to launch ('server', 'client-ldap', ...)",
247 )
248 (args, _) = parser.parse_known_args()
249 self._validateAppname(args.appname)
250 self._config["appname"] = f"hermes-{args.appname}"
252 def _validateAppname(self, name: str):
253 """Validate specified name upon appname scheme.
255 Raise HermesInvalidAppname if name is invalid
256 """
257 if type(name) is not str:
258 raise HermesInvalidAppname(
259 f"The specified name is of type '{type(name)}' instead of 'str'"
260 )
262 if name == "server":
263 return
265 if name.startswith("client-") and len(name) > len("client-"):
266 return
268 raise HermesInvalidAppname(
269 f"The specified name '{name}' doesn't respect app naming scheme."
270 " Please refer to the documentation"
271 )
273 def _getRequiredSchemas(self, isCLI: bool = False) -> dict[str, str]:
274 """Fill a dict containing main config key and absolute path of config schemas
275 required by current appname.
276 Those values will be used to build Cerberus validation schema in order to
277 validate config file.
278 """
279 main = self._config["appname"]
281 # Retrieve absolute path of hermes source directory
282 appdir = os.path.realpath(os.path.dirname(__file__) + "/../../")
284 if isCLI:
285 return {"hermes": f"{appdir}/lib/config/config-schema-cli.yml"}
287 schemas = {
288 # Global config
289 "hermes": f"{appdir}/lib/config/config-schema.yml",
290 }
292 if main == "hermes-server":
293 # Server config
294 schemas |= {"hermes-server": f"{appdir}/server/config-schema-server.yml"}
295 elif main.startswith("hermes-client-"):
296 # Client
297 clientname = main[len("hermes-client-") :]
298 schemas |= {
299 # Global client config
300 "hermes-client": f"{appdir}/clients/config-schema-client.yml",
301 # Client plugin config
302 f"hermes-client-{clientname}": (
303 f"{appdir}/plugins/clients/{clientname}"
304 f"/config-schema-client-{clientname}.yml"
305 ),
306 }
308 return schemas
310 @staticmethod
311 def _mergeSchemas(schemas: dict[str, str]) -> dict[str, Any]:
312 """Return a dict containing Cerberus validation schema for current appname"""
313 schema: dict[str, Any] = {}
314 for name, path in schemas.items():
315 with open(path) as f:
316 curschema = yaml.load(f, Loader=YAMLUniqueKeyCSafeLoader)
318 if curschema is None: # A schema can be empty
319 continue
321 if len(curschema) > 1 or list(curschema.keys())[0] != name:
322 raise HermesInvalidConfigSchemaKey(
323 f"The schema defined in '{path}' must define only the schema of "
324 f"key '{name}'. It currently defines"
325 f" {list(set(curschema.keys()) - set([name]))}"
326 )
327 schema |= curschema
329 return schema
331 def _loadPlugin(
332 self,
333 pluginFamilyDir: str,
334 pluginName: str,
335 pluginSuperClass: type,
336 pluginSubDictInConf: dict,
337 pluginSettingsDotPath: str,
338 ):
339 """Generic plugin loader"""
340 # Retrieve absolute path of hermes source directory
341 appdir = os.path.realpath(os.path.dirname(__file__) + "/../../")
343 modulepath = f"plugins.{pluginFamilyDir}.{pluginName}.{pluginName}"
344 try:
345 module = importlib.import_module(modulepath)
346 except ModuleNotFoundError as e:
347 raise HermesPluginNotFoundError(
348 f"Unable to load plugin '{pluginName}' of type "
349 f"'{pluginFamilyDir}': {str(e)}"
350 )
351 except Exception as e:
352 raise HermesPluginError(
353 f"Unable to load plugin '{pluginName}' of type '{pluginFamilyDir}',"
354 f" probably due to a syntax error in plugin code: {str(e)}"
355 )
357 try:
358 plugin_cls = getattr(module, module.HERMES_PLUGIN_CLASSNAME)
359 except AttributeError as e:
360 raise HermesPluginClassNotFoundError(str(e))
362 path = (
363 f"{appdir}/plugins/{pluginFamilyDir}/{pluginName}"
364 f"/config-schema-plugin-{pluginName}.yml"
365 )
367 with open(path) as f:
368 schema = yaml.load(f, Loader=YAMLUniqueKeyCSafeLoader)
370 if schema is None:
371 # Only when the schema file is empty, meaning there's no possible settings
372 schema = {}
374 validator = Validator(schema)
375 if not validator.validate(pluginSubDictInConf["settings"]):
376 raise HermesConfigError(f"{pluginSettingsDotPath}: {validator.errors}")
378 pluginSubDictInConf["settings"] = validator.normalized(
379 pluginSubDictInConf["settings"]
380 )
381 pluginSubDictInConf["plugininstance"] = plugin_cls(
382 pluginSubDictInConf["settings"]
383 )
384 if not isinstance(pluginSubDictInConf["plugininstance"], pluginSuperClass):
385 raise TypeError(
386 f"Plugin <{pluginName}> is not a subclass of"
387 f" '{pluginSuperClass.__name__}'"
388 )
389 __hermes__.logger.info(f"Loaded plugin {pluginFamilyDir}/{pluginName}")
391 def _loadDatasourcesPlugins(self):
392 """Load every Datasources plugins"""
393 if self["appname"] != "hermes-server":
394 return
396 for name, source in self["hermes"]["plugins"]["datasources"].items():
397 pluginname = source["type"]
398 self._loadPlugin(
399 pluginFamilyDir="datasources",
400 pluginName=pluginname,
401 pluginSuperClass=AbstractDataSourcePlugin,
402 pluginSubDictInConf=source,
403 pluginSettingsDotPath=f"hermes.plugins.datasources.{name}.settings",
404 )
406 def _loadMessageBusPlugins(self):
407 """Load every MessageBus plugins"""
408 if self["appname"] == "hermes-server":
409 familydir = "messagebus_producers"
410 superclass = AbstractMessageBusProducerPlugin
411 else:
412 familydir = "messagebus_consumers"
413 superclass = AbstractMessageBusConsumerPlugin
415 for pluginname, source in self["hermes"]["plugins"]["messagebus"].items():
416 self._loadPlugin(
417 pluginFamilyDir=familydir,
418 pluginName=pluginname,
419 pluginSuperClass=superclass,
420 pluginSubDictInConf=source,
421 pluginSettingsDotPath=(
422 f"hermes.plugins.messagebus.{pluginname}.settings"
423 ),
424 )
425 # As only one can be registered, put instance one level upper
426 self["hermes"]["plugins"]["messagebus"]["plugininstance"] = source[
427 "plugininstance"
428 ]
430 def _loadAttributesPlugins(self):
431 """Load every Attributes plugins"""
432 jinjafilters = {}
434 for pluginname, source in self["hermes"]["plugins"]["attributes"].items():
435 self._loadPlugin(
436 pluginFamilyDir="attributes",
437 pluginName=pluginname,
438 pluginSuperClass=AbstractAttributePlugin,
439 pluginSubDictInConf=source,
440 pluginSettingsDotPath=(
441 f"hermes.plugins.attributes.{pluginname}.settings"
442 ),
443 )
445 jinjafilters[pluginname] = source["plugininstance"].filter
447 self["hermes"]["plugins"]["attributes"]["_jinjafilters"] = jinjafilters