Coverage for lib/config/__init__.py: 90%

188 statements  

« prev     ^ index     » next       coverage.py v7.10.1, created at 2025-07-28 07:24 +0000

1#!/usr/bin/python3 

2# -*- coding: utf-8 -*- 

3 

4# Hermes : Change Data Capture (CDC) tool from any source(s) to any target 

5# Copyright (C) 2023, 2024 INSA Strasbourg 

6# 

7# This file is part of Hermes. 

8# 

9# Hermes is free software: you can redistribute it and/or modify 

10# it under the terms of the GNU General Public License as published by 

11# the Free Software Foundation, either version 3 of the License, or 

12# (at your option) any later version. 

13# 

14# Hermes is distributed in the hope that it will be useful, 

15# but WITHOUT ANY WARRANTY; without even the implied warranty of 

16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

17# GNU General Public License for more details. 

18# 

19# You should have received a copy of the GNU General Public License 

20# along with Hermes. If not, see <https://www.gnu.org/licenses/>. 

21 

22 

23from typing import Any, Callable, Hashable, Iterable 

24from types import FrameType 

25 

26from cerberus import Validator 

27from copy import deepcopy 

28import argparse 

29import importlib 

30import os.path 

31import sys 

32import signal 

33import threading 

34import yaml 

35from lib.datamodel.event import Event 

36from lib.datamodel.serialization import LocalCache 

37from lib.plugins import ( 

38 AbstractAttributePlugin, 

39 AbstractDataSourcePlugin, 

40 AbstractMessageBusConsumerPlugin, 

41 AbstractMessageBusProducerPlugin, 

42) 

43from lib.utils.singleton import SingleInstance, SingleInstanceException 

44 

45import lib.utils.logging 

46 

47 

48class HermesConfigError(Exception): 

49 """Raised when the config file doesn't validate the config schema""" 

50 

51 

52class HermesInvalidAppname(Exception): 

53 """Raised when the appname specified on launch doesn't respect app naming scheme""" 

54 

55 

56class HermesInvalidConfigSchemaKey(Exception): 

57 """Raised when one config schema is defining another key than its name""" 

58 

59 

60class HermesPluginNotFoundError(Exception): 

61 """Raised when specified plugin is not found""" 

62 

63 

64class HermesPluginError(Exception): 

65 """Raised when specified plugin exists but cannot be imported""" 

66 

67 

68class HermesPluginClassNotFoundError(Exception): 

69 """Raised when declared plugin class cannot be found""" 

70 

71 

72class YAMLUniqueKeyCSafeLoader(yaml.CSafeLoader): 

73 """Override yaml load to raise an error when some duplicated keys are found 

74 Tweak found on 

75 https://gist.github.com/pypt/94d747fe5180851196eb?permalink_comment_id=4015118#gistcomment-4015118 

76 """ 

77 

78 def construct_mapping(self, node, deep=False) -> dict[Hashable, Any]: 

79 mapping = set() 

80 for key_node, value_node in node.value: 

81 key = self.construct_object(key_node, deep=deep) 

82 if key in mapping: 

83 raise HermesConfigError(f"Duplicate key '{key}' found in config") 

84 mapping.add(key) 

85 return super().construct_mapping(node, deep) 

86 

87 

88class HermesConfig(LocalCache): 

89 """Load, validate config from config file, and expose config dict 

90 

91 config always contains the following keys: 

92 - appname: the current app name (hermes-server, hermes-client-ldap, ...) 

93 - hermes: hermes global config, for servers and clients 

94 

95 For server, it will contains too: 

96 - hermes-server: config for server 

97 

98 For clients, it will contains too: 

99 - hermes-client: global config for client, for options defined in GenericClient 

100 (e.g. trashbinRetentionInDays) 

101 - hermes-client-CLIENTNAME: specific config for client. 

102 e.g. LDAP connection settings for hermes-client-ldap 

103 

104 The instance contains the method setSignalsHandler() that can be called to define a 

105 handler for SIGINT and SIGTERM 

106 """ 

107 

108 def __init__( 

109 self, 

110 autoload: bool = True, 

111 from_json_dict: None | dict[str, Any] = None, 

112 allowMultipleInstances: bool = False, 

113 ): 

114 """Setup a config instance, and call load() if autoload is True""" 

115 self._config: dict[str, Any] = {} 

116 """ Configuration dictionary """ 

117 self._rawconfig: dict[str, Any] = {} 

118 """ Raw configuration dictionary (merges config files, without plugins 

119 instances) """ 

120 self._allowMultipleInstances: bool = allowMultipleInstances 

121 """Indicate if we must abort if another instance is already running""" 

122 

123 if from_json_dict is not None: 

124 self._rawconfig = from_json_dict 

125 self._config = deepcopy(self._rawconfig) 

126 super().__init__(jsondataattr="_rawconfig", cachefilename="_hermesconfig") 

127 if self.hasData(): 

128 self._loadDatasourcesPlugins() 

129 self._loadMessageBusPlugins() 

130 self._loadAttributesPlugins() 

131 else: 

132 if autoload: 

133 self.load() 

134 

135 def savecachefile(self, cacheFilename: str | None = None): 

136 """Override method only to disable backup files in cache""" 

137 return super().savecachefile(cacheFilename, dontKeepBackup=True) 

138 

139 def load(self, loadplugins: bool = True, isCLI: bool = False): 

140 """Load and validate config of current appname, and fill config dictionary. 

141 Setup logging, and signals handlers. 

142 Load plugins, and validate their config. 

143 """ 

144 self._config = {} 

145 self._setAppname() 

146 schemas = self._getRequiredSchemas(isCLI=isCLI) 

147 schema = self._mergeSchemas(schemas) 

148 

149 config = None 

150 if isCLI: 

151 try: 

152 with open(f"""{self._config["appname"]}-cli-config.yml""") as f: 

153 config = yaml.load(f, Loader=YAMLUniqueKeyCSafeLoader) 

154 except Exception: 

155 pass 

156 

157 if config is None: 

158 # Not CLI or cli-config not found 

159 with open(f"""{self._config["appname"]}-config.yml""") as f: 

160 config = yaml.load(f, Loader=YAMLUniqueKeyCSafeLoader) 

161 

162 validator = Validator(schema, purge_unknown=isCLI) 

163 if not validator.validate(config): 

164 raise HermesConfigError(validator.errors) 

165 

166 self._config |= validator.normalized(config) 

167 self._rawconfig = deepcopy(self._config) 

168 

169 if not isCLI: 

170 os.umask(self._config["hermes"]["umask"]) 

171 lib.utils.logging.setup_logger(self) # Setup logging 

172 LocalCache.setup(self) # Update cache files settings 

173 Event.LONG_STRING_LIMIT = self._config["hermes"]["logs"][ 

174 "long_string_limit" 

175 ] # Set LONG_STRING_LIMIT according to config 

176 

177 super().__init__( 

178 jsondataattr="_rawconfig", 

179 cachefilename="_hermesconfig", 

180 dontManageCacheDir=isCLI, 

181 ) 

182 

183 if not self._allowMultipleInstances: 

184 # Ensure no other instance is already running 

185 try: 

186 self.__me = SingleInstance(self._config["appname"]) 

187 except SingleInstanceException: 

188 sys.exit(1) 

189 

190 # Load plugins 

191 if loadplugins: 

192 self._loadDatasourcesPlugins() 

193 self._loadMessageBusPlugins() 

194 self._loadAttributesPlugins() 

195 

196 def setSignalsHandler(self, handler: Callable[[int, FrameType | None], None]): 

197 """Defines a handler that will intercept thoses signals: SIGINT, SIGTERM 

198 

199 The handler prototype is: 

200 handler(signalnumber: int, frame: FrameType | None) -> None 

201 See https://docs.python.org/3/library/signal.html#signal.signal 

202 """ 

203 if threading.current_thread() is not threading.main_thread(): 

204 # Do nothing if ran in a sub thread 

205 # (should happen only from from functional tests) 

206 return 

207 for signum in [signal.SIGINT, signal.SIGTERM]: 

208 signal.signal(signum, handler) 

209 

210 def __getitem__(self, key: Any) -> Any: 

211 """Returns config value of specified config key""" 

212 return self._config[key] 

213 

214 def __setitem__(self, key: Any, value: Any): 

215 """Set specified config value of specified config key""" 

216 self._config[key] = value 

217 

218 def __delitem__(self, key: Any): 

219 """Delete config value of specified config key""" 

220 del self._config[key] 

221 

222 def __iter__(self) -> Iterable: 

223 """Iterate over top level config keys""" 

224 return iter(self._config) 

225 

226 def __len__(self) -> int: 

227 """Returns number of items at top level of config""" 

228 return len(self._config) 

229 

230 def hasData(self) -> bool: 

231 """Allow to test if current config contains minimal entries""" 

232 return "appname" in self 

233 

234 def _setAppname(self): 

235 """Determine and store appname in config dict on first arg passed to hermes.py 

236 

237 Apps MUST respect the following naming scheme: 

238 - server for server 

239 - client-CLIENTNAME for clients 

240 

241 The computed appname will be prefixed by 'hermes-' 

242 """ 

243 parser = argparse.ArgumentParser(description="Hermes launcher", add_help=False) 

244 parser.add_argument( 

245 "appname", 

246 help="The Hermes application to launch ('server', 'client-ldap', ...)", 

247 ) 

248 (args, _) = parser.parse_known_args() 

249 self._validateAppname(args.appname) 

250 self._config["appname"] = f"hermes-{args.appname}" 

251 

252 def _validateAppname(self, name: str): 

253 """Validate specified name upon appname scheme. 

254 

255 Raise HermesInvalidAppname if name is invalid 

256 """ 

257 if type(name) is not str: 

258 raise HermesInvalidAppname( 

259 f"The specified name is of type '{type(name)}' instead of 'str'" 

260 ) 

261 

262 if name == "server": 

263 return 

264 

265 if name.startswith("client-") and len(name) > len("client-"): 

266 return 

267 

268 raise HermesInvalidAppname( 

269 f"The specified name '{name}' doesn't respect app naming scheme." 

270 " Please refer to the documentation" 

271 ) 

272 

273 def _getRequiredSchemas(self, isCLI: bool = False) -> dict[str, str]: 

274 """Fill a dict containing main config key and absolute path of config schemas 

275 required by current appname. 

276 Those values will be used to build Cerberus validation schema in order to 

277 validate config file. 

278 """ 

279 main = self._config["appname"] 

280 

281 # Retrieve absolute path of hermes source directory 

282 appdir = os.path.realpath(os.path.dirname(__file__) + "/../../") 

283 

284 if isCLI: 

285 return {"hermes": f"{appdir}/lib/config/config-schema-cli.yml"} 

286 

287 schemas = { 

288 # Global config 

289 "hermes": f"{appdir}/lib/config/config-schema.yml", 

290 } 

291 

292 if main == "hermes-server": 

293 # Server config 

294 schemas |= {"hermes-server": f"{appdir}/server/config-schema-server.yml"} 

295 elif main.startswith("hermes-client-"): 

296 # Client 

297 clientname = main[len("hermes-client-") :] 

298 schemas |= { 

299 # Global client config 

300 "hermes-client": f"{appdir}/clients/config-schema-client.yml", 

301 # Client plugin config 

302 f"hermes-client-{clientname}": ( 

303 f"{appdir}/plugins/clients/{clientname}" 

304 f"/config-schema-client-{clientname}.yml" 

305 ), 

306 } 

307 

308 return schemas 

309 

310 @staticmethod 

311 def _mergeSchemas(schemas: dict[str, str]) -> dict[str, Any]: 

312 """Return a dict containing Cerberus validation schema for current appname""" 

313 schema: dict[str, Any] = {} 

314 for name, path in schemas.items(): 

315 with open(path) as f: 

316 curschema = yaml.load(f, Loader=YAMLUniqueKeyCSafeLoader) 

317 

318 if curschema is None: # A schema can be empty 

319 continue 

320 

321 if len(curschema) > 1 or list(curschema.keys())[0] != name: 

322 raise HermesInvalidConfigSchemaKey( 

323 f"The schema defined in '{path}' must define only the schema of " 

324 f"key '{name}'. It currently defines" 

325 f" {list(set(curschema.keys()) - set([name]))}" 

326 ) 

327 schema |= curschema 

328 

329 return schema 

330 

331 def _loadPlugin( 

332 self, 

333 pluginFamilyDir: str, 

334 pluginName: str, 

335 pluginSuperClass: type, 

336 pluginSubDictInConf: dict, 

337 pluginSettingsDotPath: str, 

338 ): 

339 """Generic plugin loader""" 

340 # Retrieve absolute path of hermes source directory 

341 appdir = os.path.realpath(os.path.dirname(__file__) + "/../../") 

342 

343 modulepath = f"plugins.{pluginFamilyDir}.{pluginName}.{pluginName}" 

344 try: 

345 module = importlib.import_module(modulepath) 

346 except ModuleNotFoundError as e: 

347 raise HermesPluginNotFoundError( 

348 f"Unable to load plugin '{pluginName}' of type " 

349 f"'{pluginFamilyDir}': {str(e)}" 

350 ) 

351 except Exception as e: 

352 raise HermesPluginError( 

353 f"Unable to load plugin '{pluginName}' of type '{pluginFamilyDir}'," 

354 f" probably due to a syntax error in plugin code: {str(e)}" 

355 ) 

356 

357 try: 

358 plugin_cls = getattr(module, module.HERMES_PLUGIN_CLASSNAME) 

359 except AttributeError as e: 

360 raise HermesPluginClassNotFoundError(str(e)) 

361 

362 path = ( 

363 f"{appdir}/plugins/{pluginFamilyDir}/{pluginName}" 

364 f"/config-schema-plugin-{pluginName}.yml" 

365 ) 

366 

367 with open(path) as f: 

368 schema = yaml.load(f, Loader=YAMLUniqueKeyCSafeLoader) 

369 

370 if schema is None: 

371 # Only when the schema file is empty, meaning there's no possible settings 

372 schema = {} 

373 

374 validator = Validator(schema) 

375 if not validator.validate(pluginSubDictInConf["settings"]): 

376 raise HermesConfigError(f"{pluginSettingsDotPath}: {validator.errors}") 

377 

378 pluginSubDictInConf["settings"] = validator.normalized( 

379 pluginSubDictInConf["settings"] 

380 ) 

381 pluginSubDictInConf["plugininstance"] = plugin_cls( 

382 pluginSubDictInConf["settings"] 

383 ) 

384 if not isinstance(pluginSubDictInConf["plugininstance"], pluginSuperClass): 

385 raise TypeError( 

386 f"Plugin <{pluginName}> is not a subclass of" 

387 f" '{pluginSuperClass.__name__}'" 

388 ) 

389 __hermes__.logger.info(f"Loaded plugin {pluginFamilyDir}/{pluginName}") 

390 

391 def _loadDatasourcesPlugins(self): 

392 """Load every Datasources plugins""" 

393 if self["appname"] != "hermes-server": 

394 return 

395 

396 for name, source in self["hermes"]["plugins"]["datasources"].items(): 

397 pluginname = source["type"] 

398 self._loadPlugin( 

399 pluginFamilyDir="datasources", 

400 pluginName=pluginname, 

401 pluginSuperClass=AbstractDataSourcePlugin, 

402 pluginSubDictInConf=source, 

403 pluginSettingsDotPath=f"hermes.plugins.datasources.{name}.settings", 

404 ) 

405 

406 def _loadMessageBusPlugins(self): 

407 """Load every MessageBus plugins""" 

408 if self["appname"] == "hermes-server": 

409 familydir = "messagebus_producers" 

410 superclass = AbstractMessageBusProducerPlugin 

411 else: 

412 familydir = "messagebus_consumers" 

413 superclass = AbstractMessageBusConsumerPlugin 

414 

415 for pluginname, source in self["hermes"]["plugins"]["messagebus"].items(): 

416 self._loadPlugin( 

417 pluginFamilyDir=familydir, 

418 pluginName=pluginname, 

419 pluginSuperClass=superclass, 

420 pluginSubDictInConf=source, 

421 pluginSettingsDotPath=( 

422 f"hermes.plugins.messagebus.{pluginname}.settings" 

423 ), 

424 ) 

425 # As only one can be registered, put instance one level upper 

426 self["hermes"]["plugins"]["messagebus"]["plugininstance"] = source[ 

427 "plugininstance" 

428 ] 

429 

430 def _loadAttributesPlugins(self): 

431 """Load every Attributes plugins""" 

432 jinjafilters = {} 

433 

434 for pluginname, source in self["hermes"]["plugins"]["attributes"].items(): 

435 self._loadPlugin( 

436 pluginFamilyDir="attributes", 

437 pluginName=pluginname, 

438 pluginSuperClass=AbstractAttributePlugin, 

439 pluginSubDictInConf=source, 

440 pluginSettingsDotPath=( 

441 f"hermes.plugins.attributes.{pluginname}.settings" 

442 ), 

443 ) 

444 

445 jinjafilters[pluginname] = source["plugininstance"].filter 

446 

447 self["hermes"]["plugins"]["attributes"]["_jinjafilters"] = jinjafilters