Coverage for server/datamodel.py: 77%
302 statements
« prev ^ index » next coverage.py v7.10.1, created at 2025-07-28 07:25 +0000
« prev ^ index » next coverage.py v7.10.1, created at 2025-07-28 07:25 +0000
1#!/usr/bin/env python3
2# -*- coding: utf-8 -*-
4# Hermes : Change Data Capture (CDC) tool from any source(s) to any target
5# Copyright (C) 2023, 2024 INSA Strasbourg
6#
7# This file is part of Hermes.
8#
9# Hermes is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# Hermes is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with Hermes. If not, see <https://www.gnu.org/licenses/>.
23from typing import Any, Hashable, TYPE_CHECKING
25if TYPE_CHECKING: # pragma: no cover
26 # Only for type hints, won't import at runtime
27 from lib.config import HermesConfig
29from typing import Callable
31from jinja2 import StrictUndefined
32from jinja2.environment import Template
33import time
35from lib.datamodel.dataschema import Dataschema
36from lib.datamodel.dataobject import DataObject
37from lib.datamodel.dataobjectlist import DataObjectList
38from lib.datamodel.jinja import (
39 HermesNativeEnvironment,
40 Jinja,
41 HermesUnknownVarsInJinjaTemplateError,
42)
43from lib.plugins import AbstractDataSourcePlugin
44from lib.datamodel.datasource import Datasource
47class HermesDataModelMissingPrimarykeyError(Exception):
48 """Raised when the primarykey is missing from the attrsmapping of a source in
49 datamodel"""
52class HermesInvalidPrimarykeyTypeError(Exception):
53 """Raised when the primarykey is missing from the attrsmapping of a source in
54 datamodel"""
57class HermesDataModelInvalidQueryTypeError(Exception):
58 """Raised when _runQuery() is called with an invalid querytype"""
61class DatamodelFragment:
62 """Handle settings, data and access to remote source data of one datamodel type for
63 one source.
65 The data from several DatamodelFragment will then be consolidated and merged in and
66 by Datamodel.
67 """
69 HERMES_RESERVED_JINJA_VARS = set(
70 [
71 "_SELF",
72 "REMOTE_ATTRIBUTES",
73 "ITEM_CACHED_VALUES",
74 "ITEM_FETCHED_VALUES",
75 "CACHED_VALUES",
76 "FETCHED_VALUES",
77 ]
78 )
79 """Jinja variables names reserved for internal use"""
81 def __init__(
82 self,
83 dataobjtype: str,
84 datasourcename: str,
85 fragmentSettings: dict[str, str | dict],
86 primarykeyattr: str | tuple[str],
87 datasourceplugin: AbstractDataSourcePlugin,
88 attributesplugins: dict[str, Callable[..., Any]],
89 ):
90 """Create a new DatamodelFragment of specified dataobjtype, from specified
91 datasourcename.
92 To allow data to be fetched/commited properly, fragmentSettings (source
93 settings) must be specified, as primarykeyattr, datasourceplugin and
94 attributesplugins"""
95 self._dataobjects: list[DataObject] = []
96 self._datasourceplugin: AbstractDataSourcePlugin = datasourceplugin
97 self._errorcontext: str = (
98 f"hermes-server.datamodel.{dataobjtype}.{datasourcename}.attrsmapping"
99 )
100 self._jinjaenv: HermesNativeEnvironment = HermesNativeEnvironment(
101 undefined=StrictUndefined
102 )
103 self._jinjaenv.filters |= attributesplugins
104 self.datasourcename: str = datasourcename
105 self._settings: dict[str, str | dict] = fragmentSettings
106 self._compiledsettings = Jinja.compileIfJinjaTemplate(
107 self._settings, None, self._jinjaenv, self._errorcontext, False, False
108 )
109 self._dataobjclass: type[DataObject] = self.__createDataObjectsubclass(
110 dataobjtype, datasourcename, primarykeyattr
111 )
113 self.jinjavars_vars: set[str] = set()
114 """List of jinja vars used in fragment's fetch vars"""
115 self.jinjavars_query: set[str] = set()
116 """List of jinja vars used in fragment's query"""
118 # Fill self.jinjavars_vars
119 _ = Jinja.compileIfJinjaTemplate(
120 self._settings["fetch"].get("vars", {}),
121 self.jinjavars_vars,
122 self._jinjaenv,
123 self._errorcontext,
124 False,
125 False,
126 )
127 # Fill self.jinjavars_query
128 _ = Jinja.compileIfJinjaTemplate(
129 self._settings["fetch"].get("query", ""),
130 self.jinjavars_query,
131 self._jinjaenv,
132 self._errorcontext,
133 False,
134 False,
135 )
137 def __createDataObjectsubclass(
138 self, dataobjtype: str, datasourcename: str, primarykeyattr: str | tuple[str]
139 ) -> type[DataObject]:
140 """Dynamically create a new subclass of DataObject class, and set it up
141 according to Datamodel"""
142 newcls: type[DataObject] = Dataschema.createSubclass(
143 f"{dataobjtype}_{datasourcename}", DataObject
144 )
146 newcls.PRIMARYKEY_ATTRIBUTE = primarykeyattr
147 newcls.REMOTE_ATTRIBUTES = set()
148 # This function will fill newcls.REMOTE_ATTRIBUTES
149 newcls.HERMES_TO_REMOTE_MAPPING = Jinja.compileIfJinjaTemplate(
150 self._settings["attrsmapping"],
151 newcls.REMOTE_ATTRIBUTES,
152 self._jinjaenv,
153 self._errorcontext,
154 True,
155 False,
156 excludeFlatVars=self.HERMES_RESERVED_JINJA_VARS,
157 )
158 newcls.HERMES_ATTRIBUTES = set(self._settings["attrsmapping"].keys())
159 newcls.SECRETS_ATTRIBUTES = set(self._settings["secrets_attrs"])
160 newcls.CACHEONLY_ATTRIBUTES = set(self._settings["cacheonly_attrs"])
161 newcls.LOCAL_ATTRIBUTES = set(self._settings["local_attrs"])
163 __hermes__.logger.debug(
164 f"Created dynamic class:\n"
165 f" {newcls.__name__}:\n"
166 f" - {newcls.PRIMARYKEY_ATTRIBUTE=}\n"
167 f" - {newcls.HERMES_ATTRIBUTES=}\n"
168 f" - {newcls.REMOTE_ATTRIBUTES=}\n"
169 f" - {newcls.HERMES_TO_REMOTE_MAPPING=}\n"
170 f" - {newcls.SECRETS_ATTRIBUTES=}\n"
171 f" - {newcls.CACHEONLY_ATTRIBUTES=}\n"
172 f" - {newcls.LOCAL_ATTRIBUTES=}"
173 )
174 return newcls
176 def getDataobjClass(self) -> type[DataObject]:
177 """Return DataObject subclass of current fragment"""
178 return self._dataobjclass
180 def fetch(self, cache: DataObjectList, globalcontext: dict[str, Any]):
181 """Fetch data from current fragment source"""
182 cached_values: list[dict[str, Any]] = cache.toNative()
183 objcls = self.getDataobjClass()
185 context = globalcontext | {
186 "REMOTE_ATTRIBUTES": objcls.REMOTE_ATTRIBUTES,
187 "CACHED_VALUES": cached_values,
188 }
189 query = self._compiledsettings["fetch"]["query"]
190 if isinstance(query, Template):
191 query = query.render(context)
193 queryvars = Jinja.renderQueryVars(
194 self._compiledsettings["fetch"]["vars"], context
195 )
196 fetcheddata = self._runQuery(self._settings["fetch"]["type"], query, queryvars)
198 self._dataobjects = []
199 for objdata in fetcheddata:
200 # As primary key may be a tuple, we'll have to render each value of tuple
201 objpkeys = []
202 if type(objcls.PRIMARYKEY_ATTRIBUTE) is tuple:
203 pkey_attrs = objcls.PRIMARYKEY_ATTRIBUTE
204 else:
205 pkey_attrs = (objcls.PRIMARYKEY_ATTRIBUTE,)
207 for pkey_attr in pkey_attrs:
208 # Render pkey value
209 remotepkeyattr = self._compiledsettings["attrsmapping"][pkey_attr]
210 if isinstance(remotepkeyattr, Template):
211 # Render from compiled Jinja Template
212 objpkeys.append(remotepkeyattr.render(objdata))
213 else:
214 # Raw value
215 objpkeys.append(objdata.get(remotepkeyattr))
217 # Ensure pkeys values are hashable
218 unhashable = {}
219 for i in range(len(pkey_attrs)):
220 if not isinstance(objpkeys[i], Hashable):
221 unhashable[pkey_attrs[i]] = objpkeys[i]
223 if unhashable:
224 err = (
225 f"Invalid value type for primary key(s) : {unhashable}."
226 " Primary keys must be 'hashable'"
227 )
228 __hermes__.logger.critical(err)
229 raise HermesInvalidPrimarykeyTypeError(err)
231 if type(objcls.PRIMARYKEY_ATTRIBUTE) is tuple:
232 objpkey = tuple(objpkeys)
233 else:
234 objpkey = objpkeys[0]
236 item_cache = cache.get(objpkey)
237 if item_cache:
238 objcontext = {"ITEM_CACHED_VALUES": item_cache.toNative()}
239 else:
240 objcontext = {"ITEM_CACHED_VALUES": {}}
242 self._dataobjects.append(
243 objcls(from_remote=objdata, jinjaContextVars=objcontext)
244 )
246 def commit_one(
247 self, item_cached_values: dict[str, Any], item_fetched_values: dict[str, Any]
248 ):
249 """Commit that one object data changes have successfully sent to message bus"""
250 if self._settings.get("commit_one") is None:
251 return
253 objcls = self.getDataobjClass()
255 context = {
256 "REMOTE_ATTRIBUTES": objcls.REMOTE_ATTRIBUTES,
257 "ITEM_CACHED_VALUES": item_cached_values,
258 "ITEM_FETCHED_VALUES": item_fetched_values,
259 }
260 query = self._compiledsettings["commit_one"].get("query", "")
261 if isinstance(query, Template):
262 query = query.render(context)
264 queryvars = Jinja.renderQueryVars(
265 self._compiledsettings["commit_one"]["vars"], context
266 )
267 self._runQuery(self._settings["commit_one"]["type"], query, queryvars)
269 def commit_all(
270 self, cached_values: list[dict[str, Any]], fetched_values: list[dict[str, Any]]
271 ):
272 """Commit that all data fetched has successfully sent to message bus"""
273 if self._settings.get("commit_all") is None:
274 return
276 objcls = self.getDataobjClass()
278 context = {
279 "REMOTE_ATTRIBUTES": objcls.REMOTE_ATTRIBUTES,
280 "CACHED_VALUES": cached_values,
281 "FETCHED_VALUES": fetched_values,
282 }
283 query = self._compiledsettings["commit_all"].get("query", "")
284 if isinstance(query, Template):
285 query = query.render(context)
287 queryvars = Jinja.renderQueryVars(
288 self._compiledsettings["commit_all"]["vars"], context
289 )
290 self._runQuery(self._settings["commit_all"]["type"], query, queryvars)
292 def _runQuery(
293 self, querytype: str, query: str, queryvars: dict[str, Any]
294 ) -> list[dict[str, Any]] | None:
295 """Run specified query with specified quetyvars on datasource.
296 querytype must be one of fetch, add, delete, modify.
298 Returns None when querytype isn't "fetch",
299 otherwise returns a list of dict containing each entry fetched, with
300 REMOTE_ATTRIBUTES as keys, and corresponding fetched values as values
301 """
302 __hermes__.logger.debug(
303 f"{self.getDataobjClass().__name__}:"
304 f" _runQuery({querytype=}, {query=}, {queryvars=})"
305 )
306 fetcheddata = None
307 starttime = time.time()
308 with self._datasourceplugin:
309 if querytype == "fetch":
310 fetcheddata = self._datasourceplugin.fetch(query, queryvars)
311 elif querytype == "add":
312 self._datasourceplugin.add(query, queryvars)
313 elif querytype == "delete":
314 self._datasourceplugin.delete(query, queryvars)
315 elif querytype == "modify":
316 self._datasourceplugin.modify(query, queryvars)
317 else:
318 raise HermesDataModelInvalidQueryTypeError(
319 f"runQuery called with invalid querytype '{querytype}'"
320 )
322 elapsedms = int(round(1000 * (time.time() - starttime)))
323 if fetcheddata is None:
324 __hermes__.logger.debug(
325 f"{self.getDataobjClass().__name__}:"
326 f" _runQuery() returned in {elapsedms} ms"
327 )
328 else:
329 __hermes__.logger.debug(
330 f"{self.getDataobjClass().__name__}:"
331 f" _runQuery() returned {len(fetcheddata)} entries in {elapsedms} ms"
332 )
334 return fetcheddata
337class Datamodel:
338 """Load and build the Datamodel from config.
340 In charge of:
341 - generating Dataschema
342 - retrieving remote data from all sources, and merging it
343 """
345 def __init__(self, config: "HermesConfig"):
346 """Build the datamodel from config"""
347 self._fragments: dict[str, list[DatamodelFragment]] = {
348 k: [] for k in config["hermes-server"]["datamodel"].keys()
349 }
351 self._config: "HermesConfig" = config
352 self._jinjaenv: HermesNativeEnvironment = HermesNativeEnvironment(
353 undefined=StrictUndefined
354 )
356 # Fill the _fragments dictionary
357 datamodel: dict[str, Any] = config["hermes-server"]["datamodel"]
358 for objtype, fragmentslist in self._fragments.items():
359 for sourcename, sourcesettings in datamodel[objtype]["sources"].items():
360 pkeyattr = datamodel[objtype]["primarykeyattr"]
361 if type(pkeyattr) is list:
362 pkeyattr = tuple(pkeyattr)
363 item = DatamodelFragment(
364 objtype,
365 sourcename,
366 sourcesettings,
367 pkeyattr,
368 config["hermes"]["plugins"]["datasources"][sourcename][
369 "plugininstance"
370 ],
371 config["hermes"]["plugins"]["attributes"]["_jinjafilters"],
372 )
373 fragmentslist.append(item)
375 # Consolidate _fragments data to set up the Dataschema
376 self.dataschema: Dataschema = self.__setupSchema()
377 """Current Dataschema"""
379 # Compile Jinja template of integrity_constraints, and store vars of
380 # merge_constraints for "lazy" generation of template vars
381 self._compileJinja()
383 # Load Datasource with cache
384 self.data: Datasource = Datasource(
385 schema=self.dataschema, enableTrashbin=False, enableCache=True
386 )
388 def __setupSchema(self) -> Dataschema:
389 """Consolidate _fragments data to set up the Dataschema"""
390 schema: dict[str, Any] = {}
391 datamodel: dict[str, Any] = self._config["hermes-server"]["datamodel"]
393 for objtype in self._fragments.keys():
394 count: dict[str, int] = {}
395 secrets_attrs = set()
396 cacheonly_attrs = set()
397 local_attrs = set()
398 foreign_keys: dict[str, list[str]] = {}
400 # Save foreign keys, they'll be checked later
401 for localkey, v in datamodel[objtype]["foreignkeys"].items():
402 foreign_keys[localkey] = [v["from_objtype"], v["from_attr"]]
404 # Ensure primarykey is in attrsmapping of each sources
405 for fragment in self._fragments[objtype]:
406 objcls = fragment.getDataobjClass()
407 if objcls.SECRETS_ATTRIBUTES:
408 secrets_attrs |= objcls.SECRETS_ATTRIBUTES
409 if objcls.CACHEONLY_ATTRIBUTES:
410 cacheonly_attrs |= objcls.CACHEONLY_ATTRIBUTES
411 if objcls.LOCAL_ATTRIBUTES:
412 local_attrs |= objcls.LOCAL_ATTRIBUTES
413 for attr in objcls.HERMES_ATTRIBUTES:
414 count[attr] = count[attr] + 1 if attr in count else 1
415 pkey = self._fragments[objtype][0].getDataobjClass().PRIMARYKEY_ATTRIBUTE
417 if type(pkey) is tuple:
418 for key in pkey:
419 if count[key] != len(self._fragments[objtype]):
420 raise HermesDataModelMissingPrimarykeyError(
421 f"The primary key '{pkey}' must be fetched from each"
422 " datasource"
423 )
424 else:
425 if count[pkey] != len(self._fragments[objtype]):
426 raise HermesDataModelMissingPrimarykeyError(
427 f"The primary key '{pkey}' must be fetched from each datasource"
428 )
430 # Compile toString jinja template
431 jinjavars = set()
432 tostringTpl = Jinja.compileIfJinjaTemplate(
433 self._config["hermes-server"]["datamodel"][objtype]["toString"],
434 jinjavars,
435 self._jinjaenv,
436 f"hermes-server.datamodel.{objtype}.toString",
437 False,
438 False,
439 )
440 # Ensure jinja vars are known local attrs
441 unknownattrs = jinjavars - set(count.keys())
442 if unknownattrs:
443 raise HermesUnknownVarsInJinjaTemplateError(
444 "Unknown attributes met in 'hermes-server.datamodel"
445 f".{objtype}.toString' jinja template: {unknownattrs}"
446 )
447 schema[objtype] = {
448 "HERMES_ATTRIBUTES": set(count.keys()),
449 "SECRETS_ATTRIBUTES": secrets_attrs,
450 "CACHEONLY_ATTRIBUTES": cacheonly_attrs,
451 "LOCAL_ATTRIBUTES": local_attrs,
452 "PRIMARYKEY_ATTRIBUTE": pkey,
453 "FOREIGN_KEYS": foreign_keys,
454 "TOSTRING": tostringTpl,
455 }
457 res = Dataschema(schema)
458 return res
460 def _compileJinja(self):
461 """Compile Jinja template of integrity_constraints, and store vars of
462 merge_constraints for "lazy" generation of template vars"""
463 for dataobjtype, settings in self._config["hermes-server"]["datamodel"].items():
464 # Fill merge_constraints_vars that will contain required vars by differents
465 # merge_constraints specified in sources. This will allow lazy generation of
466 # Jinja env vars
467 settings["merge_constraints_vars"] = set()
468 for srcname, srcsettings in settings["sources"].items():
469 Jinja.compileIfJinjaTemplate(
470 var=srcsettings["merge_constraints"],
471 flatvars_set=settings["merge_constraints_vars"],
472 jinjaenv=self._jinjaenv,
473 errorcontext=(
474 f"hermes-server.datamodel.{dataobjtype}.sources.{srcname}"
475 ".merge_constraints"
476 ),
477 allowOnlyOneTemplate=False,
478 allowOnlyOneVar=False,
479 )
481 # Compile integrity_constraints templates and fill
482 # integrity_constraints_vars that will contain required vars.
483 # This willallow lazy generation of Jinja env vars
484 settings["integrity_constraints_vars"] = set()
485 settings["integrity_constraints"] = Jinja.compileIfJinjaTemplate(
486 var=settings["integrity_constraints"],
487 flatvars_set=settings["integrity_constraints_vars"],
488 jinjaenv=self._jinjaenv,
489 errorcontext=(
490 f"hermes-server.datamodel.{dataobjtype}.integrity_constraints"
491 ),
492 allowOnlyOneTemplate=False,
493 allowOnlyOneVar=False,
494 )
496 def fetch(self):
497 """Fetch data from all sources, enforce merge and integrity constraints, and
498 store merged data in 'ds' attribute"""
499 fragment: DatamodelFragment
500 # Load data starting in specific order to minimize inconsistencies if any
501 # modifications are processed in the same time
502 for objtype, objlistcls in self.dataschema.objectlistTypes.items():
503 settings = self._config["hermes-server"]["datamodel"][objtype]
504 dontMergeOnConflict = settings["on_merge_conflict"] == "use_cached_entry"
505 cache = self.data.cache[objtype]
506 mergeFiltered: set[Any] = set() # pkeys
508 # Fetch data
509 starttime = time.time()
510 for fragment in self._fragments[objtype]:
511 # Fill global context dict with objtype_pkeys and objtype if
512 # requested in fragment's fetch "query" or "vars"
513 context = {}
514 alreadyProcessed = True
515 for context_objtype in self.dataschema.objectlistTypes:
516 if context_objtype == objtype:
517 # Current and remaining context_objtype haven't been processed
518 # yet
519 alreadyProcessed = False
521 # Generate context only if required
522 if (
523 context_objtype + "_pkeys"
524 in fragment.jinjavars_vars | fragment.jinjavars_query
525 ):
526 if alreadyProcessed:
527 context[context_objtype + "_pkeys"] = self.data[
528 context_objtype
529 ].getPKeys()
530 else:
531 __hermes__.logger.warning(
532 f"You're trying to use '{context_objtype}_pkeys' var"
533 " of an objtype declared after the current one"
534 f" ({objtype}) in datamodel, which has therefore not"
535 " yet been processed. Will use an empty var."
536 )
537 context[context_objtype + "_pkeys"] = set()
538 if (
539 context_objtype
540 in fragment.jinjavars_vars | fragment.jinjavars_query
541 ):
542 if alreadyProcessed:
543 context[context_objtype] = self.data[
544 context_objtype
545 ].toNative()
546 else:
547 __hermes__.logger.warning(
548 f"You're trying to use '{context_objtype}' var of "
549 "an objtype declared after the current one "
550 f"({objtype}) in datamodel, which has therefore not"
551 " yet been processed. Will use an empty var."
552 )
553 context[context_objtype] = []
554 fragment.fetch(cache, context) # Fetch fragment data from remote source
556 elapsedms = int(round(1000 * (time.time() - starttime)))
557 __hermes__.logger.debug(
558 f"Fetched and converted all <{objtype}> data in {elapsedms} ms"
559 )
561 # Enforce merge constraints, if any
562 if any(
563 len(i._compiledsettings["merge_constraints"]) > 0
564 for i in self._fragments[objtype]
565 ):
566 starttime = time.time()
567 hasChanged = True
568 # Loop until no change is made
569 while hasChanged:
570 hasChanged = False
572 # Fill vars dict
573 vars = {}
574 for fragment in self._fragments[objtype]:
575 dataobjlist = objlistcls(objlist=fragment._dataobjects)
576 # Generate vars only if required
577 if (
578 fragment.datasourcename + "_pkeys"
579 in settings["merge_constraints_vars"]
580 ):
581 vars[fragment.datasourcename + "_pkeys"] = (
582 dataobjlist.getPKeys()
583 )
584 if (
585 fragment.datasourcename
586 in settings["merge_constraints_vars"]
587 ):
588 vars[fragment.datasourcename] = dataobjlist.toNative()
590 # Apply constraints
591 for fragment in self._fragments[objtype]:
592 constraints = fragment._compiledsettings["merge_constraints"]
593 if not constraints:
594 continue
595 toRemove = set()
596 for obj in fragment._dataobjects:
597 for constraint in constraints:
598 # Generate _SELF var only if required
599 if "_SELF" in settings["merge_constraints_vars"]:
600 vars["_SELF"] = obj.toNative()
601 if not constraint.render(vars):
602 toRemove.add(obj)
603 break
604 if toRemove:
605 hasChanged = True
606 # __hermes__.logger.debug(
607 # f"Merge constraints: filtering {len(toRemove)}"
608 # f" item(s) from {fragment.datasourcename}"
609 # )
610 for obj in toRemove:
611 fragment._dataobjects.remove(obj)
612 mergeFiltered.add(obj.getPKey())
614 elapsedms = int(round(1000 * (time.time() - starttime)))
615 __hermes__.logger.debug(
616 f"Enforced <{objtype}> merge constraints in {elapsedms} ms:"
617 f" filtered {len(mergeFiltered)} item(s)"
618 )
620 # Merge data
621 starttime = time.time()
622 objlist: DataObjectList = None
623 for fragment in self._fragments[objtype]:
624 if objlist is None: # First fragment
625 objlist = objlistcls(objlist=fragment._dataobjects)
626 else: # other fragments than first
627 mergeFiltered |= objlist.mergeWith(
628 objlist=fragment._dataobjects,
629 pkeyMergeConstraint=fragment._settings["pkey_merge_constraint"],
630 dontMergeOnConflict=dontMergeOnConflict,
631 )
632 objlist.mergeFiltered |= mergeFiltered
633 elapsedms = int(round(1000 * (time.time() - starttime)))
634 __hermes__.logger.debug(
635 f"Merged all <{objtype}> data in {elapsedms} ms: filtered"
636 f" {len(mergeFiltered)} item(s)"
637 )
639 # Store merged data in current Datasource var
640 self.data[objtype] = objlist
642 # Replace inconsistencies and merge conflicts by cache values
643 self.data[objtype].replaceInconsistenciesByCachedValues(
644 self.data.cache[objtype]
645 )
647 # Enforce integrity constraints, if any
648 if any(
649 len(self._config["hermes-server"]["datamodel"][t]["integrity_constraints"])
650 > 0
651 for t in self.dataschema.objectlistTypes
652 ):
653 starttime = time.time()
655 # Store all integrity constraints vars
656 all_integrity_vars = set()
657 for objtype in self.dataschema.objectlistTypes:
658 settings = self._config["hermes-server"]["datamodel"][objtype]
659 all_integrity_vars.update(settings["integrity_constraints_vars"])
661 hasChanged = True
662 # Loop until no change is made
663 while hasChanged:
664 hasChanged = False
666 # Fill vars dict
667 vars = {}
668 for objtype in self.dataschema.objectlistTypes:
669 settings = self._config["hermes-server"]["datamodel"][objtype]
670 # Generate vars only if required
671 if objtype + "_pkeys" in all_integrity_vars:
672 vars[objtype + "_pkeys"] = self.data[objtype].getPKeys()
673 if objtype in all_integrity_vars:
674 vars[objtype] = self.data[objtype].toNative()
676 # Apply constraints
677 for objtype in self.dataschema.objectlistTypes:
678 settings = self._config["hermes-server"]["datamodel"][objtype]
679 if not settings["integrity_constraints"]:
680 continue
682 integrityFiltered = set()
683 for obj in self.data[objtype]:
684 for constraint in settings["integrity_constraints"]:
685 # Generate _SELF var only if required
686 if "_SELF" in settings["integrity_constraints_vars"]:
687 vars["_SELF"] = obj.toNative()
688 if not constraint.render(vars):
689 integrityFiltered.add(obj.getPKey())
690 break
691 if integrityFiltered:
692 hasChanged = True
693 for pkey in integrityFiltered:
694 self.data[objtype].removeByPkey(pkey)
695 __hermes__.logger.debug(
696 f"Integrity constraints: filtered {len(integrityFiltered)}"
697 f" item(s) from {objtype}"
698 )
699 self.data[objtype].integrityFiltered |= integrityFiltered
701 elapsedms = int(round(1000 * (time.time() - starttime)))
702 __hermes__.logger.debug(f"Integrity constraints enforced in {elapsedms} ms")
704 def commit_one(self, obj: DataObject):
705 """Commit that specified 'obj' data changes have successfully sent to message
706 bus"""
707 for objtype, objcls in self.dataschema.objectTypes.items():
708 if isinstance(obj, objcls):
709 cachedobj: DataObject | None = self.data.cache[objtype].get(
710 obj.getPKey()
711 )
712 cachedvalues = cachedobj.toNative() if cachedobj else {}
713 fetchedvalues = obj.toNative()
714 fragments = self._fragments[objtype]
715 break
716 else:
717 raise TypeError(
718 f"Specified object {repr(obj)} has invalid type '{type(obj)}'"
719 )
721 for fragment in fragments:
722 fragment.commit_one(cachedvalues, fetchedvalues)
724 def commit_all(self, objtype: str):
725 """Commit that all data fetched has successfully sent to message bus"""
726 cachedvalues = self.data.cache[objtype].toNative()
727 fetchedvalues = self.data[objtype].toNative()
728 fragments = self._fragments[objtype]
730 for fragment in fragments:
731 fragment.commit_all(cachedvalues, fetchedvalues)