Coverage for clients/datamodel.py: 92%
398 statements
« prev ^ index » next coverage.py v7.10.1, created at 2025-07-28 07:24 +0000
« prev ^ index » next coverage.py v7.10.1, created at 2025-07-28 07:24 +0000
1#!/usr/bin/env python3
2# -*- coding: utf-8 -*-
4# Hermes : Change Data Capture (CDC) tool from any source(s) to any target
5# Copyright (C) 2023, 2024 INSA Strasbourg
6#
7# This file is part of Hermes.
8#
9# Hermes is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# Hermes is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with Hermes. If not, see <https://www.gnu.org/licenses/>.
23from lib.config import HermesConfig
25from copy import deepcopy
26from datetime import datetime
27from jinja2 import StrictUndefined
28from jinja2.environment import Template
29from typing import Any
31from clients.errorqueue import ErrorQueue
32from lib.datamodel.dataobject import DataObject
33from lib.datamodel.dataobjectlist import DataObjectList
34from lib.datamodel.dataschema import Dataschema
35from lib.datamodel.datasource import Datasource
36from lib.datamodel.diffobject import DiffObject
37from lib.datamodel.event import Event
38from lib.datamodel.serialization import LocalCache
39from lib.datamodel.jinja import (
40 HermesNativeEnvironment,
41 Jinja,
42 HermesUnknownVarsInJinjaTemplateError,
43)
46class InvalidDataError(Exception):
47 """Raised when a case that should never happen occurs (a critical bug)"""
50class Datamodel:
51 """Load and build the Datamodel from config, and validates it according to remote
52 Dataschema.
54 In charge of:
55 - handling updates of Datamodel (hermes-client.datamodel changes in config file)
56 - handling updates of remote Dataschema (hermes-server.datamodel in server
57 config file)
58 - converting a remote Event to a local one
59 - handling remote and local data caches (remotedata and localdata attributes,
60 each of Datasource type)
61 """
63 def __init__(
64 self,
65 config: HermesConfig,
66 ):
67 """Build the datamodel from config"""
69 self.unknownRemoteTypes: set[str] = set()
70 """List remote types set in client Datamodel, but missing in remote
71 Dataschema"""
72 self.unknownRemoteAttributes: dict[str, set[str]] = set()
73 """List remote attributes set in client Datamodel, but missing in remote
74 Dataschema. The dict key contains the remote type, the set contains the missing
75 attributes"""
77 self._config: HermesConfig = config
79 self._rawdatamodel: dict[str, Any] = self._config["hermes-client"]["datamodel"]
80 """Local datamodel dictionary, as found in config"""
82 self._datamodel: dict[str, Any]
83 """Local datamodel dictionary, with compiled Jinja templates"""
85 self._jinjaenv: HermesNativeEnvironment = HermesNativeEnvironment(
86 undefined=StrictUndefined
87 )
88 if "hermes" in self._config:
89 self._jinjaenv.filters |= self._config["hermes"]["plugins"]["attributes"][
90 "_jinjafilters"
91 ]
93 self.remote_schema: Dataschema = Dataschema.loadcachefile("_dataschema")
94 """Remote schema"""
95 self.local_schema: Dataschema | None = None
96 """Local schema"""
98 self.remotedata: Datasource | None = None
99 """Datasource of remote objects"""
100 self.localdata: Datasource | None = None
101 """Datasource of local objects"""
103 self.remotedata_complete: Datasource | None = None
104 """Datasource of remote objects as it should be without error"""
105 self.localdata_complete: Datasource | None = None
106 """Datasource of local objects as it should be without error"""
108 self.errorqueue: ErrorQueue | None = None
109 """Queue of Events in error"""
111 self.typesmapping: dict[str, str]
112 """Mapping of datamodel types: hermes-server type as key, hermes-client type as
113 value"""
114 self._remote2local: dict[str, dict[str, list[str]]]
115 """Mapping with remote type name as key, and dict containing remote attrname as
116 key and local attrname as value. Example:
117 {
118 remote_type_name: {
119 remote_attrname1: client_attrname1,
120 ...
121 },
122 ...
123 }
124 """
126 if self.hasRemoteSchema():
127 self._mergeWithSchema(self.remote_schema)
129 def hasRemoteSchema(self) -> bool:
130 """Returns true if remote schema has data"""
131 return len(self.remote_schema.schema) != 0
133 def diffFrom(self, other: "Datamodel") -> DiffObject:
134 """Return DiffObject with differences (attributes names) of current instance
135 from another"""
136 diff = DiffObject()
138 s = self._rawdatamodel.keys()
139 o = other._rawdatamodel.keys()
140 commonattrs = s & o
142 diff.appendRemoved(o - s)
143 diff.appendAdded(s - o)
145 for k, v in self._rawdatamodel.items():
146 if k in commonattrs and DataObject.isDifferent(v, other._rawdatamodel[k]):
147 diff.appendModified(k)
149 return diff
151 def loadLocalData(self):
152 """Load or reload localdata and localdata_complete from cache"""
153 self.localdata = Datasource(
154 schema=self.local_schema, enableTrashbin=True, cacheFilePrefix="__"
155 )
156 self.localdata.loadFromCache()
157 self.localdata_complete = Datasource(
158 schema=self.local_schema,
159 enableTrashbin=True,
160 cacheFilePrefix="__",
161 cacheFileSuffix="_complete__",
162 )
163 self.localdata_complete.loadFromCache()
165 def saveLocalData(self):
166 """Save localdata and localdata_complete when they're set"""
167 if self.localdata is not None:
168 self.localdata.save()
169 if self.localdata_complete is not None:
170 self.localdata_complete.save()
172 def loadRemoteData(self):
173 """Load or reload remotedata and remotedata_complete from cache"""
174 self.remotedata = Datasource(schema=self.remote_schema, enableTrashbin=True)
175 self.remotedata.loadFromCache()
176 self.remotedata_complete = Datasource(
177 schema=self.remote_schema,
178 enableTrashbin=True,
179 cacheFileSuffix="_complete__",
180 )
181 self.remotedata_complete.loadFromCache()
182 if self.errorqueue is not None:
183 self.errorqueue.updateDatasources(
184 self.remotedata,
185 self.remotedata_complete,
186 self.localdata,
187 self.localdata_complete,
188 )
190 def saveRemoteData(self):
191 """Save remotedata and remotedata_complete when they're set"""
192 if self.remotedata is not None:
193 self.remotedata.save()
194 if self.remotedata_complete is not None:
195 self.remotedata_complete.save()
197 def loadLocalAndRemoteData(self):
198 """Load or reload localdata, localdata_complete, remotedata and
199 remotedata_complete from cache"""
200 self.loadLocalData()
201 self.loadRemoteData()
203 def saveLocalAndRemoteData(self):
204 """Save localdata, localdata_complete, remotedata and remotedata_complete
205 from cache when they're set"""
206 self.saveLocalData()
207 self.saveRemoteData()
209 def loadErrorQueue(self):
210 """Load or reload error queue from cache"""
211 if self.hasRemoteSchema():
212 self.errorqueue = ErrorQueue.loadcachefile(
213 "_errorqueue",
214 typesMapping=self.typesmapping,
215 remotedata_complete=self.remotedata_complete,
216 remotedata=self.remotedata,
217 localdata=self.localdata,
218 localdata_complete=self.localdata_complete,
219 autoremediate=self._config["hermes-client"]["autoremediation"],
220 )
221 else:
222 self.errorqueue = None
224 def saveErrorQueue(self):
225 """Save error queue to cache"""
226 if self.errorqueue is not None:
227 self.errorqueue.savecachefile()
229 def _mergeWithSchema(self, remote_schema: Dataschema):
230 """Build or update the datamodel according to specified remote_schema"""
231 prev_remote_schema = self.remote_schema
232 self.remote_schema = remote_schema
233 self._remote2local = {}
235 prev_remote_pkeys, new_remote_pkeys = self._checkForSchemaChanges(
236 prev_remote_schema, self.remote_schema
237 )
239 self._fillDatamodelDict() # Filled upon config only
240 self._fillConversionVars() # Filled upon config only
242 self.local_schema = self._setupLocalSchema()
244 # Update pkeys when necessary
245 if new_remote_pkeys:
246 __hermes__.logger.info("Updating local cache primary keys")
248 self.saveLocalAndRemoteData()
249 self.saveErrorQueue()
251 new_local_pkeys: dict[str, str | tuple[str]] = {}
252 l_pkeys_to_add: dict[str, set[str]] = {}
253 l_pkeys_to_remove: dict[str, set[str]] = {}
254 local_types: dict[str, set[type[DataObject]]] = {}
255 for r_objtype in new_remote_pkeys.keys():
256 if r_objtype not in self.typesmapping:
257 continue
259 # Determine local objtype and its new primary key
260 l_objtype = self.typesmapping[r_objtype]
261 new_local_pkeys[l_objtype] = self.local_schema.schema[l_objtype][
262 "PRIMARYKEY_ATTRIBUTE"
263 ]
265 # Compute local pkeys to add and to remove for each local data type
266 r_prev_pkeys = prev_remote_pkeys[r_objtype]
267 r_new_pkeys = new_remote_pkeys[r_objtype]
268 if type(r_prev_pkeys) is str:
269 r_prev_pkeys = (r_prev_pkeys,)
270 if type(r_new_pkeys) is str:
271 r_new_pkeys = (r_new_pkeys,)
272 l_prev_pkeys = set([f"_pkey_{pkey}" for pkey in r_prev_pkeys])
273 l_new_pkeys = set([f"_pkey_{pkey}" for pkey in r_new_pkeys])
274 l_pkeys_to_add[l_objtype] = l_new_pkeys - l_prev_pkeys
275 l_pkeys_to_remove[l_objtype] = l_prev_pkeys - l_new_pkeys
277 local_types[l_objtype] = set()
279 # Add new pkey attributes and values to localdata objects
280 for src in (self.localdata, self.localdata_complete):
281 for type_prefix in ("", "trashbin_"):
282 obj: DataObject
283 for obj in src[f"{type_prefix}{l_objtype}"]:
284 if type(obj) not in local_types[l_objtype]:
285 local_types[l_objtype].add(type(obj))
286 type(obj).HERMES_ATTRIBUTES |= l_pkeys_to_add[l_objtype]
287 # Get corresponding remote object from cache
288 (_, r_obj) = Datamodel.getObjectFromCacheOrTrashbin(
289 self.remotedata_complete, r_objtype, obj.getPKey()
290 )
291 if r_obj is None:
292 # Should never happen : if so, it's a bug
293 msg = (
294 f"BUG ! No matching of local object {repr(obj)}"
295 " found in remotedata_complete cache. The client is"
296 " probably broken"
297 )
298 __hermes__.logger.critical(msg)
299 raise InvalidDataError(msg)
301 for pkey in r_new_pkeys:
302 try:
303 # Get pkey value from remote object
304 value = getattr(r_obj, pkey)
305 except AttributeError:
306 # Should never happen : if so, it's a bug
307 msg = (
308 "BUG ! No value exist in remote cache for"
309 f" attribute '{pkey}' of object {r_obj}. The"
310 " client is probably broken"
311 )
312 __hermes__.logger.critical(msg)
313 raise InvalidDataError(msg)
314 # Store pkey value to local object
315 setattr(obj, f"_pkey_{pkey}", value)
317 # Update PRIMARYKEY_ATTRIBUTE of each local type
318 for l_objtype, l_types in local_types.items():
319 for l_type in l_types:
320 l_type.PRIMARYKEY_ATTRIBUTE = new_local_pkeys[l_objtype]
322 # Remove previous pkey attributes that are not used anymore from
323 # localdata objects
324 for src in (self.localdata, self.localdata_complete):
325 for type_prefix in ("", "trashbin_"):
326 obj: DataObject
327 for obj in src[f"{type_prefix}{l_objtype}"]:
328 for pkey in l_pkeys_to_remove[l_objtype]:
329 try:
330 delattr(obj, pkey)
331 except AttributeError:
332 pass
334 # Remove previous pkey attributes that are not used anymore from
335 # HERMES_ATTRIBUTES of each local type
336 for l_objtype, l_types in local_types.items():
337 for l_type in l_types:
338 l_type.HERMES_ATTRIBUTES -= l_pkeys_to_remove[l_objtype]
340 self.saveLocalData()
342 __hermes__.logger.info("Updating changed primary keys in error queue")
343 self.errorqueue.updatePrimaryKeys(
344 new_remote_pkeys,
345 self.remotedata,
346 self.remotedata_complete,
347 new_local_pkeys,
348 self.localdata,
349 self.localdata_complete,
350 )
352 # Save and reload error queue
353 self.saveErrorQueue()
354 self.loadErrorQueue()
356 # Load local and remote Datasource caches
357 self.loadLocalAndRemoteData()
359 def updateSchema(self, remote_schema: Dataschema):
360 """Build or update the datamodel according to specified remote_schema.
361 Data caches (locadata, locadata_complete, remotedata and remotedata_complete)
362 will be saved and reloaded to be updated according to new schema.
363 Remote and local schemas caches will be saved.
364 """
365 # Save current data before updating schema and reloading them
366 self.saveLocalAndRemoteData()
367 self._mergeWithSchema(remote_schema)
368 self.remote_schema.savecachefile()
370 def forcePurgeOfTrashedObjectsWithoutNewPkeys(
371 self, oldschema: Dataschema | None, newschema: Dataschema
372 ) -> dict[str, set[Any]]:
373 """On schema update, when primary key have changed, the trashed objects may not
374 contain the value of the new primary key attribute(s). This function will
375 change the trashbin timestamp of all those objects to force their removal.
376 Returns True if a trashbin purge is required, False otherwise
377 """
378 isTrashbinPurgeRequired: bool = False
379 if oldschema is None:
380 return False
382 diff = newschema.diffFrom(oldschema)
384 if not (diff and diff.modified):
385 return False
387 old: dict[str, Any] = oldschema.schema
388 new: dict[str, Any] = newschema.schema
389 for objtype in diff.modified:
390 if objtype not in self.typesmapping:
391 continue
392 npkey = new[objtype]["PRIMARYKEY_ATTRIBUTE"]
393 opkey = old[objtype]["PRIMARYKEY_ATTRIBUTE"]
394 if not DataObject.isDifferent(npkey, opkey):
395 continue
396 npkeys = (npkey,) if type(npkey) is str else npkey
397 obj: DataObject
398 for obj in self.remotedata[f"trashbin_{objtype}"]:
399 for pkey in npkeys:
400 if not hasattr(obj, pkey):
401 __hermes__.logger.warning(
402 f"Object {repr(obj)} of type '{objtype}' in trashbin will"
403 " be purged, as it doesn't have the new primary key value"
404 )
405 isTrashbinPurgeRequired = True
406 obj._trashbin_timestamp = datetime(year=1, month=1, day=1)
407 break
408 return isTrashbinPurgeRequired
410 def _checkForSchemaChanges(
411 self, oldschema: Dataschema | None, newschema: Dataschema
412 ) -> tuple[dict[str, str | tuple[str]], dict[str, str | tuple[str]]]:
413 """Returns a tuple of two dicts :
414 - first dict with remote types as key, and previous remote primary key
415 attribute as value
416 - second dict with remote types as key, and new remote primary key attribute as
417 value
418 """
419 previouspkeys = {}
420 newpkeys = {}
421 if oldschema is None:
422 return (previouspkeys, newpkeys)
424 diff = newschema.diffFrom(oldschema)
426 if diff:
427 old: dict[str, Any] = oldschema.schema
428 new: dict[str, Any] = newschema.schema
430 if diff.added:
431 __hermes__.logger.info(f"Types added in Dataschema: {diff.added}")
433 if diff.removed:
434 __hermes__.logger.info(
435 f"Types removed from Dataschema: {diff.removed},"
436 " purging cache files"
437 )
438 self.purgeOldCacheFiles(diff.removed)
440 if diff.modified:
441 for objtype in diff.modified:
442 n = new[objtype]
443 o = old[objtype]
444 # HERMES_ATTRIBUTES
445 added = n["HERMES_ATTRIBUTES"] - o["HERMES_ATTRIBUTES"]
446 removed = o["HERMES_ATTRIBUTES"] - n["HERMES_ATTRIBUTES"]
447 if added:
448 __hermes__.logger.info(
449 f"New attributes in dataschema type '{objtype}': {added}"
450 )
451 if removed:
452 __hermes__.logger.info(
453 f"Removed attributes from dataschema type '{objtype}':"
454 f" {removed}"
455 )
457 # SECRETS_ATTRIBUTES
458 added = n["SECRETS_ATTRIBUTES"] - o["SECRETS_ATTRIBUTES"]
459 removed = o["SECRETS_ATTRIBUTES"] - n["SECRETS_ATTRIBUTES"]
460 if added:
461 __hermes__.logger.info(
462 f"New secrets attributes in dataschema type '{objtype}':"
463 f" {added}"
464 )
465 # We need to purge attribute from cache: as cache is loaded with
466 # attribute set up as SECRET, we just have to save the cache
467 # (attr won't be saved anymore, as it's SECRET) and reload
468 # cache to "forget" values loaded from previous cache
469 self.saveRemoteData()
470 self.loadRemoteData()
471 if removed:
472 __hermes__.logger.info(
473 "Removed secrets attributes from dataschema type"
474 f" '{objtype}': {removed}"
475 )
477 # PRIMARYKEY_ATTRIBUTE
478 npkey = n["PRIMARYKEY_ATTRIBUTE"]
479 opkey = o["PRIMARYKEY_ATTRIBUTE"]
480 if DataObject.isDifferent(npkey, opkey):
481 previouspkeys[objtype] = opkey
482 newpkeys[objtype] = npkey
483 __hermes__.logger.info(
484 "New primary key attribute in dataschema type"
485 f" '{objtype}': {npkey}"
486 )
488 return (previouspkeys, newpkeys)
490 def convertEventToLocal(
491 self,
492 event: Event,
493 new_obj: DataObject | None = None,
494 allowEmptyEvent: bool = False,
495 ) -> Event | None:
496 """Convert specified remote event to local one.
497 If new_obj is provided, it must contains all the new remote object values,
498 and will only be used to render Jinja Templates.
499 Returns None if local event doesn't contains any attribute and allowEmptyEvent
500 is False"""
501 if event.objtype not in self.typesmapping:
502 __hermes__.logger.debug(
503 f"Unknown {event.objtype=}. Known are {self.typesmapping}"
504 )
505 return None # Unknown type
507 objtype = self.typesmapping[event.objtype]
509 # Handle that event.objattrs is 1 depth deeper for "modified" events
510 if event.eventtype == "modified":
511 sources = ("added", "modified", "removed")
512 objattrs = {"added": {}, "modified": {}, "removed": {}}
513 else:
514 sources = (None,)
515 objattrs = {}
517 hasContent: bool = False
518 for source in sources:
519 if source is None:
520 src = event.objattrs
521 else:
522 src = event.objattrs[source]
524 # Hack to handle Jinja templates containing only static data
525 if None in self._remote2local[event.objtype] and event.eventtype == "added":
526 loopsrc = src.copy()
527 loopsrc[None] = None
528 else:
529 loopsrc = src
531 for k, v in loopsrc.items():
532 if k in self._remote2local[event.objtype]:
533 for dest in self._remote2local[event.objtype][k]:
534 remoteattr = self._datamodel[objtype]["attrsmapping"][dest]
535 if isinstance(
536 remoteattr, Template
537 ): # May be a compiled Jinja Template
538 if new_obj is None:
539 val = remoteattr.render(src)
540 else:
541 # We must provide all new object vars values to
542 # render a Jinja Template computed from several vars,
543 # in case of "modified" event changing the value of
544 # only one var value used by the template.
545 # The event objattrs won't be enough in this specific
546 # case.
547 val = remoteattr.render(new_obj.toNative())
549 if type(val) is list:
550 val = [v for v in val if v is not None]
552 if val is None or val == []:
553 # No value
554 if event.eventtype == "modified":
555 objattrs["removed"].update({dest: val})
556 elif event.eventtype == "removed":
557 objattrs.update({dest: val})
558 else:
559 # In modified events, we have to determine if the
560 # attribute is added or modified
561 if event.eventtype == "modified":
562 _, cachedObj = self.getObjectFromCacheOrTrashbin(
563 self.localdata_complete,
564 self.typesmapping[event.objtype],
565 event.objpkey,
566 )
568 if cachedObj is not None and hasattr(
569 cachedObj, dest
570 ):
571 # Ensure the value has changed
572 previousVal = getattr(cachedObj, dest)
573 if DataObject.isDifferent(previousVal, val):
574 objattrs["modified"].update({dest: val})
575 hasContent = True
576 else:
577 # Attr is added
578 objattrs["added"].update({dest: val})
579 hasContent = True
580 else:
581 objattrs.update({dest: val})
582 hasContent = True
583 else:
584 if source is None:
585 objattrs.update({dest: v})
586 else:
587 objattrs[source].update({dest: v})
588 hasContent = True
590 res = None
591 if hasContent or allowEmptyEvent or event.eventtype == "removed":
592 res = Event(
593 evcategory=event.evcategory,
594 eventtype=event.eventtype,
595 objattrs=objattrs,
596 )
597 res.objtype = objtype
598 res.objpkey = event.objpkey
599 res.objrepr = str(res.objpkey)
600 res.timestamp = event.timestamp
601 res.step = event.step
602 res.isPartiallyProcessed = event.isPartiallyProcessed
603 return res
605 def createLocalDataobject(
606 self, objtype: str, objattrs: dict[str:Any]
607 ) -> DataObject:
608 """Returns instance of specified local Dataobject type from specified
609 attributes"""
610 return self.createDataobject(self.local_schema, objtype, objattrs)
612 def createRemoteDataobject(
613 self, objtype: str, objattrs: dict[str:Any]
614 ) -> DataObject:
615 """Returns instance of specified remote Dataobject type from specified
616 attributes"""
617 return self.createDataobject(self.remote_schema, objtype, objattrs)
619 @staticmethod
620 def createDataobject(
621 schema: Dataschema, objtype: str, objattrs: dict[str:Any]
622 ) -> DataObject:
623 """Returns instance of specified Dataobject type from specified attributes"""
624 return schema.objectTypes[objtype](from_json_dict=objattrs)
626 @staticmethod
627 def getUpdatedObject(obj: DataObject, objattrs: dict[str, Any]) -> DataObject:
628 """Return a deepcopy of specified obj, with its attributes updated upon
629 specified objattrs dict from Event"""
630 newobj = deepcopy(obj)
632 # Update newobj attributes
633 for attrname, value in objattrs["added"].items():
634 setattr(newobj, attrname, value) # Add new attributes
635 for attrname, value in objattrs["modified"].items():
636 setattr(newobj, attrname, value) # Update existing attributes
637 for attrname, value in objattrs["removed"].items():
638 if hasattr(newobj, attrname):
639 delattr(newobj, attrname) # Delete existing attributes
641 return newobj
643 def convertDataObjectToLocal(self, obj: DataObject) -> DataObject:
644 """Convert specified Dataobject (remote) to local one"""
645 tmpEvent = self.convertEventToLocal(
646 Event("conversion", "added", obj, obj.toNative())
647 )
648 return self.local_schema.objectTypes[self.typesmapping[obj.getType()]](
649 from_json_dict=tmpEvent.objattrs
650 )
652 def convertDataObjectListToLocal(
653 self, remoteobjtype: str, objlist: DataObjectList
654 ) -> DataObjectList:
655 """Convert specified DataObjectList (remote) to local one"""
656 localobjs = [self.convertDataObjectToLocal(obj) for obj in objlist]
657 return self.local_schema.objectlistTypes[self.typesmapping[remoteobjtype]](
658 objlist=localobjs
659 )
661 @staticmethod
662 def purgeOldCacheFiles(
663 objtypes: list[str] | set[str],
664 cacheFilePrefix: str = "",
665 cacheFileSuffix: str = "",
666 ):
667 """ "Delete all cache files of specified objtypes"""
668 for objtype in objtypes:
669 LocalCache.deleteAllCacheFiles(
670 f"{cacheFilePrefix}{objtype}{cacheFileSuffix}"
671 )
672 LocalCache.deleteAllCacheFiles(
673 f"{cacheFilePrefix}{objtype}_complete__{cacheFileSuffix}"
674 )
675 LocalCache.deleteAllCacheFiles(
676 f"{cacheFilePrefix}trashbin_{objtype}{cacheFileSuffix}"
677 )
678 LocalCache.deleteAllCacheFiles(
679 f"{cacheFilePrefix}trashbin_{objtype}_complete__{cacheFileSuffix}"
680 )
682 def _fillDatamodelDict(self):
683 # Fill the datamodel dict
684 self._datamodel = {}
685 for objtype in self._config["hermes-client"]["datamodel"]:
686 self._datamodel[objtype] = {}
687 for k, v in self._config["hermes-client"]["datamodel"][objtype].items():
688 if k == "attrsmapping":
689 self._datamodel[objtype][k] = dict(v)
690 elif k == "toString" and v is not None:
691 # Compile toString's jinja template
692 jinjavars = set()
693 self._datamodel[objtype][k] = Jinja.compileIfJinjaTemplate(
694 v,
695 jinjavars,
696 self._jinjaenv,
697 f"hermes-client.datamodel.{objtype}.toString",
698 False,
699 False,
700 )
701 # Ensure jinja vars are known local attrs
702 unknownattrs = (
703 jinjavars
704 - self._config["hermes-client"]["datamodel"][objtype][
705 "attrsmapping"
706 ].keys()
707 )
708 if unknownattrs:
709 raise HermesUnknownVarsInJinjaTemplateError(
710 "Unknown attributes met in 'hermes-client.datamodel"
711 f".{objtype}.toString' jinja template: {unknownattrs}"
712 )
713 else:
714 self._datamodel[objtype][k] = v
716 def _fillConversionVars(self):
717 # Fill the types mapping (remote as key, local as value)
718 typesmapping = {v["hermesType"]: k for k, v in self._datamodel.items()}
720 # Types consistency check
721 self.unknownRemoteTypes = typesmapping.keys() - self.remote_schema.schema.keys()
722 if self.unknownRemoteTypes:
723 for t in self.unknownRemoteTypes:
724 del typesmapping[t]
726 # Reorder typemapping to respect the order specified on remote schema
727 self.typesmapping = {}
728 for rtype in self.remote_schema.schema:
729 if rtype in typesmapping:
730 self.typesmapping[rtype] = typesmapping[rtype]
732 # Fill the remote2local mapping dict
733 for objsettings in self._datamodel.values():
734 remote_objtype = objsettings["hermesType"]
735 self._remote2local[remote_objtype] = {}
736 # Add primary keys to mapping to ensure they're always available
737 if remote_objtype in self.remote_schema.schema:
738 pkeys = self.remote_schema.schema[remote_objtype][
739 "PRIMARYKEY_ATTRIBUTE"
740 ]
741 if type(pkeys) not in [list, tuple]:
742 pkeys = [pkeys]
743 for pkey in pkeys:
744 objsettings["attrsmapping"][f"_pkey_{pkey}"] = pkey
746 for local_attr, remote_attr in objsettings["attrsmapping"].items():
747 remote_vars = set()
748 objsettings["attrsmapping"][local_attr] = Jinja.compileIfJinjaTemplate(
749 remote_attr,
750 remote_vars,
751 self._jinjaenv,
752 f"hermes-client.datamodel.{remote_objtype}.attrsmapping",
753 False,
754 False,
755 )
756 if len(remote_vars) == 0:
757 # Hack to handle Jinja templates containing only static data
758 remote_vars.add(None)
760 for remote_var in remote_vars:
761 # As many local attrs can be mapped on a same remote attr,
762 # store the mapping in a list
763 self._remote2local[remote_objtype].setdefault(
764 remote_var, []
765 ).append(local_attr)
767 # Attributes consistency check
768 self.unknownRemoteAttributes = {}
769 for rtype in self.typesmapping:
770 diff = (
771 self._remote2local[rtype].keys()
772 - self.remote_schema.schema[rtype]["HERMES_ATTRIBUTES"]
773 - set([None]) # Ignore Jinja templates with static data only
774 )
775 if diff:
776 self.unknownRemoteAttributes[rtype] = diff
778 def _setupLocalSchema(self) -> Dataschema:
779 """Build local schema from local datamodel and remote schema"""
780 rschema: dict[str, Any] = self.remote_schema.schema
781 schema: dict[str, Any] = {}
782 for objtype in self.typesmapping.values():
783 remote_objtype = self._datamodel[objtype]["hermesType"]
785 secrets = []
786 for attr in rschema[remote_objtype]["SECRETS_ATTRIBUTES"]:
787 v = self._remote2local[remote_objtype].get(attr)
788 if v is not None:
789 secrets.extend(v)
791 # Add primary keys to mapping to ensure they're always there
792 pkeys = self.remote_schema.schema[remote_objtype]["PRIMARYKEY_ATTRIBUTE"]
793 if type(pkeys) in [list, tuple]:
794 pkey = tuple([f"_pkey_{pkey}" for pkey in pkeys])
795 else:
796 pkey = f"_pkey_{pkeys}"
798 # Convert foreign keys dict
799 fkeys: dict[str, list[str]] = {}
800 for from_attr, (to_obj, to_attr) in self.remote_schema.schema[
801 remote_objtype
802 ]["FOREIGN_KEYS"].items():
803 # In current implementation, foreign key are always
804 # single primary keys (not a tuple)
805 fkeys[f"_pkey_{from_attr}"] = [
806 self.typesmapping[to_obj],
807 f"_pkey_{to_attr}",
808 ]
810 schema[objtype] = {
811 "HERMES_ATTRIBUTES": set(
812 self._datamodel[objtype]["attrsmapping"].keys()
813 ),
814 "SECRETS_ATTRIBUTES": set(secrets),
815 "CACHEONLY_ATTRIBUTES": set(),
816 "LOCAL_ATTRIBUTES": set(),
817 "PRIMARYKEY_ATTRIBUTE": pkey,
818 "FOREIGN_KEYS": fkeys,
819 "TOSTRING": self._datamodel[objtype]["toString"],
820 }
822 res = Dataschema(schema)
823 return res
825 @staticmethod
826 def getObjectFromCacheOrTrashbin(
827 ds: Datasource, objtype: str, objpkey: Any
828 ) -> tuple[DataObjectList | None, DataObject | None]:
829 """Look for objpkey in maincache and trashbin of objtype in specified ds.
830 If found, returns a tuple with a the DataObjectList where the object is, and the
831 object itself. Otherwise, returns (None, None)"""
832 src: DataObjectList
833 obj: DataObject | None
834 for src in (ds[objtype], ds[f"trashbin_{objtype}"]):
835 obj = src.get(objpkey)
836 if obj is not None:
837 return (src, obj)
838 return (None, None)