Coverage for clients / __init__.py: 83%
890 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-16 15:08 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-16 15:08 +0000
1#!/usr/bin/env python3
2# -*- coding: utf-8 -*-
4# Hermes : Change Data Capture (CDC) tool from any source(s) to any target
5# Copyright (C) 2023 INSA Strasbourg
6#
7# This file is part of Hermes.
8#
9# Hermes is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# Hermes is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with Hermes. If not, see <https://www.gnu.org/licenses/>.
23from clients.datamodel import Datamodel, InvalidDataError
24from lib.config import HermesConfig
25from lib.version import HERMES_VERSION
26from lib.datamodel.dataobject import DataObject
27from lib.datamodel.dataobjectlist import DataObjectList
28from lib.datamodel.datasource import Dataschema, Datasource
29from lib.datamodel.diffobject import DiffObject
30from lib.datamodel.event import Event
31from lib.datamodel.serialization import LocalCache, JSONEncoder
32from lib.plugins import AbstractMessageBusConsumerPlugin
33from lib.utils.mail import Email
34from lib.utils.socket import (
35 SockServer,
36 SocketMessageToServer,
37 SocketMessageToClient,
38 SocketArgumentParser,
39 SocketParsingError,
40 SocketParsingMessage,
41)
43from copy import deepcopy
44from datetime import datetime, timedelta
45from time import sleep
46from types import FrameType
47from typing import Any
48import argparse
49import json
50import signal
51import traceback
54class HermesAlreadyNotifiedException(Exception):
55 """Raised when an exception has already been notified, to avoid a second
56 notification"""
59class HermesClientHandlerError(Exception):
60 """Raised when an exception is met during client handler call"""
62 def __init__(self, err: Exception | str | None):
63 if isinstance(err, Exception):
64 self.msg: str | None = HermesClientHandlerError.exceptionToString(err)
65 """Printable error message, None in a rare case where exception is raised
66 without error, but to postpone an event processing"""
67 else:
68 self.msg = err
69 super().__init__(self.msg)
71 @staticmethod
72 def exceptionToString(
73 exception: Exception, purgeCurrentFileFromTrace: bool = True
74 ) -> str:
75 """Convert the specified exception to a string containing its full trace"""
76 lines = traceback.format_exception(
77 type(exception), exception, exception.__traceback__
78 )
80 if purgeCurrentFileFromTrace:
81 # Purging current file infos from traceback
82 lines = [line for line in lines if __file__ not in line]
84 return "".join(lines).strip()
87class HermesClientCache(LocalCache):
88 """Hermes client data to cache"""
90 def __init__(self, from_json_dict: dict[str, Any] = {}):
91 super().__init__(
92 jsondataattr=[
93 "queueErrors",
94 "datamodelWarnings",
95 "exception",
96 "initstartoffset",
97 "initstopoffset",
98 "nextoffset",
99 ],
100 )
102 self.queueErrors: dict[str, str] = from_json_dict.get("queueErrors", {})
103 """Dictionary containing current objects in error queue, for notifications"""
105 self.datamodelWarnings: dict[str, dict[str, dict[str, Any]]] = (
106 from_json_dict.get("datamodelWarnings", {})
107 )
108 """Dictionary containing current datamodel warnings, for notifications"""
110 self.exception: str | None = from_json_dict.get("exception")
111 """String containing latest exception trace"""
113 self.initstartoffset: Any | None = from_json_dict.get("initstartoffset")
114 """Contains the offset of the first message of initSync sequence on message
115 bus"""
116 self.initstopoffset: Any | None = from_json_dict.get("initstopoffset")
117 """Contains the offset of the last message of initSync sequence on message
118 bus"""
119 self.nextoffset: Any | None = from_json_dict.get("nextoffset")
120 """Contains the offset of the next message to process on message bus"""
122 def savecachefile(self, cacheFilename: str | None = None):
123 """Override method only to disable backup files in cache"""
124 return super().savecachefile(cacheFilename, dontKeepBackup=True)
127class GenericClient:
128 """Superclass of all hermes-client implementations.
129 Manage all the internals of hermes for a client: datamodel updates, caching, error
130 management, trashbin and converting messages from message bus into events handlers
131 calls"""
133 __FOREIGNKEYS_POLICIES: dict[str, tuple[str]] = {
134 "disabled": tuple(),
135 "on_remove_event": ("removed",),
136 "on_every_event": ("added", "modified", "removed"),
137 }
138 """Different foreignkeys_policy settings : associate each foreignkeys_policy
139 (as key) with the list of event types that will be placed in the error queue if the
140 object concerning them is the parent (by foreign key) of an object already present
141 in the error queue"""
143 def __init__(self, config: HermesConfig):
144 """Instantiate a new client"""
146 __hermes__.logger.info(f"Starting {config['appname']} v{HERMES_VERSION}")
148 # Setup the signals handler
149 config.setSignalsHandler(self.__signalHandler)
151 self.__config: HermesConfig = config
152 """Current config"""
153 try:
154 self.config: dict[str, Any] = self.__config[self.__config["appname"]]
155 """Dict containing the client plugin configuration"""
156 except KeyError:
157 self.config = {}
159 self.__previousconfig: HermesConfig = HermesConfig.loadcachefile(
160 "_hermesconfig"
161 )
162 """Previous config (from cache)"""
164 self.__msgbus: AbstractMessageBusConsumerPlugin = self.__config["hermes"][
165 "plugins"
166 ]["messagebus"]["plugininstance"]
167 self.__msgbus.setTimeout(
168 self.__config["hermes-client"]["updateInterval"] * 1000
169 )
171 self.__cache: HermesClientCache = HermesClientCache.loadcachefile(
172 f"_{self.__config['appname']}"
173 )
174 """Cached attributes"""
175 self.__cache.setCacheFilename(f"_{self.__config['appname']}")
177 self.__startTime: datetime | None = None
178 """Datetime when mainloop was started"""
180 self.__isPaused: datetime | None = None
181 """Contains pause datetime if standard processing is paused, None otherwise"""
183 self.__isStopped: bool = False
184 """mainloop() will run until this var is set to True"""
186 self.__numberOfLoopToProcess: int | None = None
187 """**For functionnal tests only**, if a value is set, will process for *value*
188 iterations of mainloop and pause execution until a new positive value is set"""
190 self.__sock: SockServer | None = None
191 """Facultative socket to allow cli communication"""
192 if (
193 self.__config["hermes"]["cli_socket"]["path"] is not None
194 or self.__config["hermes"]["cli_socket"]["dont_manage_sockfile"] is not None
195 ):
196 self.__sock = SockServer(
197 path=self.__config["hermes"]["cli_socket"]["path"],
198 owner=self.__config["hermes"]["cli_socket"]["owner"],
199 group=self.__config["hermes"]["cli_socket"]["group"],
200 mode=self.__config["hermes"]["cli_socket"]["mode"],
201 processHdlr=self.__processSocketMessage,
202 dontManageSockfile=self.__config["hermes"]["cli_socket"][
203 "dont_manage_sockfile"
204 ],
205 )
206 self.__setupSocketParser()
208 self.__useFirstInitsyncSequence: bool = self.__config["hermes-client"][
209 "useFirstInitsyncSequence"
210 ]
211 """Indicate if we prefer using the first/oldest or last/most recent
212 initsync sequence available on message bus"""
214 self.__newdatamodel: Datamodel = Datamodel(config=self.__config)
215 """New datamodel (from current config)"""
217 if self.__previousconfig.hasData():
218 # Start app with previous datamodel to be able to check differences with
219 # new one through self.__processDatamodelUpdate()
220 self.__datamodel: Datamodel = Datamodel(config=self.__previousconfig)
221 """Current datamodel"""
222 else:
223 # As no previous datamodel is available, start app with new one
224 self.__datamodel = self.__newdatamodel
226 self.__trashbin_retention: timedelta | None = None
227 """Timedelta with delay to keep removed data in trashbin before permanently
228 deleting it. 'None' means no trashbin"""
229 if self.__config["hermes-client"]["trashbin_retention"] > 0:
230 self.__trashbin_retention = timedelta(
231 days=self.__config["hermes-client"]["trashbin_retention"]
232 )
234 self.__trashbin_purgeInterval: timedelta | None = timedelta(
235 minutes=self.__config["hermes-client"]["trashbin_purgeInterval"]
236 )
237 """Timedelta with delay between two trashbin purge attempts"""
239 self.__trashbin_lastpurge: datetime = datetime(year=1, month=1, day=1)
240 """Datetime when latest trashbin purge was ran"""
242 self.__errorQueue_retryInterval: timedelta | None = timedelta(
243 minutes=self.__config["hermes-client"]["errorQueue_retryInterval"]
244 )
245 """Timedelta with delay between two attempts of processing events in error"""
247 self.__errorQueue_lastretry: datetime = datetime(year=1, month=1, day=1)
248 """Datetime when latest error queue retry was ran"""
250 self.__currentStep: int = 0
251 """Store the step number of current event processing. Will be stored in events
252 in error queue to allow clients to resume an event where it has failed"""
254 self.__isPartiallyProcessed: bool = False
255 """Store if some data has been processed during current event processing. Will
256 be stored in events in error queue to handle autoremediation properly"""
258 self.__isAnErrorRetry: bool = False
259 """Indicate to handler whether the current event is being processed as part of
260 an error retry"""
262 self.__saveRequired: bool = False
263 """Reset to False at each loop start, and if any change is made during
264 processing, set to True in order to save all cache at the loop end.
265 Used to avoid expensive .save() calls when unnecessary
266 """
268 self.__foreignkeys_events: tuple[str] = self.__FOREIGNKEYS_POLICIES[
269 self.__config["hermes-client"]["foreignkeys_policy"]
270 ]
271 """List of event types that will be placed in the error queue if the object
272 concerning them is the parent (by foreign key) of an object already present in
273 the error queue"""
275 def getObjectFromCache(self, objtype: str, objpkey: Any) -> DataObject:
276 """Returns a deepcopy of an object from cache.
277 Raise IndexError if objtype is invalid, or if objpkey is not found
278 """
279 ds: Datasource = self.__datamodel.localdata
281 _, obj = Datamodel.getObjectFromCacheOrTrashbin(ds, objtype, objpkey)
282 if obj is None:
283 raise IndexError(
284 f"No object of {objtype=} with {objpkey=} was found in cache"
285 )
287 return deepcopy(obj)
289 def getDataobjectlistFromCache(self, objtype: str) -> DataObjectList:
290 """Returns cache of specified objtype, by reference.
291 WARNING: Any modification of the cache content will mess up your client !!!
292 Raise IndexError if objtype is invalid
293 """
294 ds: Datasource = self.__datamodel.localdata
295 cache = ds[objtype]
296 trashbin = ds[f"trashbin_{objtype}"]
298 # Create an empty DataObjectList of same type as cache
299 res = type(cache)(objlist=[])
301 res.extend(cache)
302 res.extend(trashbin)
303 return res
305 def __signalHandler(self, signalnumber: int, frame: FrameType | None):
306 """Signal handler that will be called on SIGINT and SIGTERM"""
307 __hermes__.logger.critical(
308 f"Signal '{signal.strsignal(signalnumber)}' received, terminating"
309 )
310 self.__isStopped = True
312 def __setupSocketParser(self):
313 """Set up the argparse context for unix socket commands"""
314 self.__parser = SocketArgumentParser(
315 prog=f"{self.__config['appname']}-cli",
316 description=f"Hermes client {self.__config['appname']} CLI",
317 exit_on_error=False,
318 )
320 subparsers = self.__parser.add_subparsers(help="Sub-commands")
322 # Quit
323 sp_quit = subparsers.add_parser("quit", help=f"Stop {self.__config['appname']}")
324 sp_quit.set_defaults(func=self.__sock_quit)
326 # Pause
327 sp_pause = subparsers.add_parser(
328 "pause", help="Pause processing until 'resume' command is sent"
329 )
330 sp_pause.set_defaults(func=self.__sock_pause)
332 # Resume
333 sp_resume = subparsers.add_parser(
334 "resume", help="Resume processing that has been paused with 'pause'"
335 )
336 sp_resume.set_defaults(func=self.__sock_resume)
338 # Status
339 sp_status = subparsers.add_parser(
340 "status", help=f"Show {self.__config['appname']} status"
341 )
342 sp_status.set_defaults(func=self.__sock_status)
343 sp_status.add_argument(
344 "-j",
345 "--json",
346 action="store_const",
347 const=True,
348 default=False,
349 help="Print status as json",
350 )
351 sp_status.add_argument(
352 "-v",
353 "--verbose",
354 action="store_const",
355 const=True,
356 default=False,
357 help="Output items without values",
358 )
360 def __processSocketMessage(
361 self, msg: SocketMessageToServer
362 ) -> SocketMessageToClient:
363 """Handler that process specified msg received on unix socket and returns the
364 answer to send"""
365 reply: SocketMessageToClient | None = None
367 try:
368 args = self.__parser.parse_args(msg.argv)
369 if "func" not in args:
370 raise SocketParsingMessage(self.__parser.format_help())
371 except (SocketParsingError, SocketParsingMessage) as e:
372 retmsg = str(e)
373 except argparse.ArgumentError as e:
374 retmsg = self.__parser.format_error(str(e))
375 else:
376 try:
377 reply = args.func(args)
378 except Exception as e:
379 lines = traceback.format_exception(type(e), e, e.__traceback__)
380 trace = "".join(lines).strip()
381 __hermes__.logger.critical(f"Unhandled exception: {trace}")
382 retmsg = trace
384 if reply is None: # Error was met
385 reply = SocketMessageToClient(retcode=1, retmsg=retmsg)
387 return reply
389 def __sock_quit(self, args: argparse.Namespace) -> SocketMessageToClient:
390 """Handler called when quit subcommand is requested on unix socket"""
391 self.__isStopped = True
392 __hermes__.logger.info(f"{self.__config['appname']} has been requested to quit")
393 return SocketMessageToClient(retcode=0, retmsg="")
395 def __sock_pause(self, args: argparse.Namespace) -> SocketMessageToClient:
396 """Handler called when pause subcommand is requested on unix socket"""
397 if self.__isStopped:
398 return SocketMessageToClient(
399 retcode=1,
400 retmsg=f"Error: {self.__config['appname']} is currently being stopped",
401 )
403 if self.__isPaused:
404 return SocketMessageToClient(
405 retcode=1,
406 retmsg=f"Error: {self.__config['appname']} is already paused",
407 )
409 __hermes__.logger.info(
410 f"{self.__config['appname']} has been requested to pause"
411 )
412 self.__isPaused = datetime.now()
413 return SocketMessageToClient(retcode=0, retmsg="")
415 def __sock_resume(self, args: argparse.Namespace) -> SocketMessageToClient:
416 """Handler called when resume subcommand is requested on unix socket"""
417 if self.__isStopped:
418 return SocketMessageToClient(
419 retcode=1,
420 retmsg=f"Error: {self.__config['appname']} is currently being stopped",
421 )
423 if not self.__isPaused:
424 return SocketMessageToClient(
425 retcode=1, retmsg=f"Error: {self.__config['appname']} is not paused"
426 )
428 __hermes__.logger.info(
429 f"{self.__config['appname']} has been requested to resume"
430 )
431 self.__isPaused = None
432 return SocketMessageToClient(retcode=0, retmsg="")
434 def __sock_status(self, args: argparse.Namespace) -> SocketMessageToClient:
435 """Handler called when status subcommand is requested on unix socket"""
436 status = self.__status(verbose=args.verbose)
437 if args.json:
438 msg = json.dumps(status, cls=JSONEncoder, indent=4)
439 else:
440 nl = "\n"
441 info2printable = {}
442 msg = ""
443 for objname in [self.__config["appname"]] + list(
444 status.keys() - (self.__config["appname"],)
445 ):
446 infos = status[objname]
447 msg += f"{objname}:{nl}"
448 for category in ("information", "warning", "error"):
449 if category not in infos:
450 continue
451 if not infos[category]:
452 msg += f" * {category.capitalize()}: []{nl}"
453 continue
455 msg += f" * {category.capitalize()}{nl}"
456 for infoname, infodata in infos[category].items():
457 indentedinfodata = str(infodata).replace("\n", "\n ")
458 msg += (
459 f" - {info2printable.get(infoname, infoname)}:"
460 f" {indentedinfodata}{nl}"
461 )
462 msg = msg.rstrip()
464 return SocketMessageToClient(retcode=0, retmsg=msg)
466 @property
467 def currentStep(self) -> int:
468 """Step number of current event processed.
469 Allow clients to resume an event where it has failed"""
470 return self.__currentStep
472 @currentStep.setter
473 def currentStep(self, value: int):
474 if type(value) is not int:
475 raise TypeError(
476 f"Specified step {value=} has invalid type '{type(value)}'"
477 " instead of int"
478 )
480 if value < 0:
481 raise ValueError(f"Specified step {value=} must be greater or equal to 0")
483 self.__currentStep = value
485 @property
486 def isPartiallyProcessed(self) -> bool:
487 """Indicate if some data has been processed during current event processing.
488 Required to handle autoremediation properly"""
489 return self.__isPartiallyProcessed
491 @isPartiallyProcessed.setter
492 def isPartiallyProcessed(self, value: bool):
493 if type(value) is not bool:
494 raise TypeError(
495 f"Specified isPartiallyProcessed {value=} has invalid type"
496 f" '{type(value)}' instead of bool"
497 )
499 self.__isPartiallyProcessed = value
501 @property
502 def isAnErrorRetry(self) -> bool:
503 """Read-only attribute that indicates to handler whether the current event is
504 being processed as part of an error retry"""
505 return self.__isAnErrorRetry
507 def mainLoop(self):
508 """Client main loop"""
509 self.__startTime = datetime.now()
511 self.__checkDatamodelWarnings()
512 # TODO: implement a check to ensure subclasses required data types and
513 # attributes exist in datamodel
515 if self.__datamodel.hasRemoteSchema():
516 __hermes__.logger.debug(
517 "Remote Dataschema in cache:"
518 f" {self.__datamodel.remote_schema.to_json()=}"
519 )
520 self.__datamodel.loadErrorQueue()
521 else:
522 __hermes__.logger.debug("No remote Dataschema in cache yet")
524 if self.__sock is not None:
525 self.__sock.startProcessMessagesDaemon(appname=__hermes__.appname)
527 # Reduce sleep duration during functional tests to speed them up
528 sleepDuration = 1 if self.__numberOfLoopToProcess is None else 0.05
530 isFirstLoopIteration: bool = True
531 while not self.__isStopped:
532 self.__saveRequired = False
534 try:
535 with self.__msgbus:
536 if self.__isPaused or self.__numberOfLoopToProcess == 0:
537 sleep(sleepDuration)
538 continue
540 try:
541 if self.__hasAlreadyBeenInitialized():
542 if isFirstLoopIteration:
543 try:
544 self.__processDatamodelUpdate()
545 except Exception as e:
546 self.__notifyFatalException(
547 HermesClientHandlerError.exceptionToString(
548 e, purgeCurrentFileFromTrace=False
549 )
550 )
551 raise HermesAlreadyNotifiedException
552 self.__retryErrorQueue()
553 self.__emptyTrashBin()
554 self.__processEvents(isInitSync=False)
555 else:
556 __hermes__.logger.info(
557 "Client hasn't ran its first initsync sequence yet"
558 )
559 if self.__canBeInitialized():
560 __hermes__.logger.info(
561 "First initsync sequence processing begins"
562 )
563 self.__processEvents(isInitSync=True)
564 if self.__hasAlreadyBeenInitialized():
565 __hermes__.logger.info(
566 "First initsync sequence processing completed"
567 )
568 else:
569 __hermes__.logger.info(
570 "No initsync sequence is available on message bus."
571 " Retry..."
572 )
574 self.__notifyException(None)
576 except HermesAlreadyNotifiedException:
577 pass
578 except InvalidDataError as e:
579 self.__notifyFatalException(
580 HermesClientHandlerError.exceptionToString(
581 e, purgeCurrentFileFromTrace=False
582 )
583 )
584 except Exception as e:
585 self.__notifyException(
586 HermesClientHandlerError.exceptionToString(
587 e, purgeCurrentFileFromTrace=False
588 )
589 )
590 finally:
591 isFirstLoopIteration = False
592 # Still could be True if an exception was raised in
593 # __retryErrorQueue()
594 self.__isAnErrorRetry = False
595 except Exception as e:
596 __hermes__.logger.warning(
597 "Message bus seems to be unavailable."
598 " Waiting 60 seconds before retrying"
599 )
600 self.__notifyException(
601 HermesClientHandlerError.exceptionToString(
602 e, purgeCurrentFileFromTrace=False
603 )
604 )
605 # Wait one second 60 times to avoid waiting too long before stopping
606 for i in range(60):
607 if self.__isStopped:
608 break
609 sleep(1)
610 finally:
611 if self.__saveRequired or self.__isStopped:
612 self.__datamodel.saveErrorQueue()
613 if self.__hasAtLeastBeganInitialization():
614 self.__datamodel.saveLocalAndRemoteData()
615 # Reset current step for "on_save" event
616 self.currentStep = 0
617 self.isPartiallyProcessed = False
618 # Call special event "on_save()"
619 self.__callHandler("", "save")
620 self.__notifyQueueErrors()
621 self.__cache.savecachefile()
623 if self.__isStopped:
624 # Only to ensure cache files version update is saved,
625 # to avoid version migrations at each restart,
626 # as those files aren't expected to be updated often
627 if (
628 self.__hasAtLeastBeganInitialization()
629 and self.__cache.nextoffset is not None
630 and self.__cache.nextoffset > self.__cache.initstartoffset
631 ):
632 # Do not save until a first event has been properly handled
633 self.__config.savecachefile()
634 self.__datamodel.remote_schema.savecachefile()
636 # Only used in functionnal tests
637 if self.__numberOfLoopToProcess:
638 self.__numberOfLoopToProcess -= 1
640 def __retryErrorQueue(self):
641 # Enforce retryInterval
642 now = datetime.now()
643 if now < self.__errorQueue_lastretry + self.__errorQueue_retryInterval:
644 return # Too early to process again
646 # All events processed in __retryErrorQueue() require this attribute to be True
647 self.__isAnErrorRetry = True
649 done = False
650 evNumbersToRetry: list[int] = []
651 eventNumber: int
652 localEvent: Event
653 remoteEvent: Event | None
654 while not done:
655 retryQueue: list[int] = []
656 previousKeys = self.__datamodel.errorqueue.keys()
658 for (
659 eventNumber,
660 remoteEvent,
661 localEvent,
662 errorMsg,
663 ) in self.__datamodel.errorqueue:
664 if evNumbersToRetry and eventNumber not in evNumbersToRetry:
665 # Ignore eventNumber absent from evNumbersToRetry,
666 # excepted on first iteration of while loop
667 continue
669 if remoteEvent is not None:
670 if self.__datamodel.errorqueue.isEventAParentOfAnotherError(
671 remoteEvent, False
672 ):
673 __hermes__.logger.info(
674 f"Won't retry remote event {remoteEvent} from error queue"
675 " as it is still a dependency of another error"
676 )
677 retryQueue.append(eventNumber)
678 continue
680 __hermes__.logger.info(
681 f"Retrying to process remote event {remoteEvent} from error"
682 " queue"
683 )
684 try:
685 self.__processRemoteEvent(
686 remoteEvent, localEvent, enqueueEventWithError=False
687 )
688 except HermesClientHandlerError as e:
689 __hermes__.logger.info(
690 f"... failed on step {self.currentStep}: {str(e)}"
691 )
692 remoteEvent.step = self.currentStep
693 remoteEvent.isPartiallyProcessed = self.isPartiallyProcessed
694 localEvent.step = self.currentStep
695 localEvent.isPartiallyProcessed = self.isPartiallyProcessed
696 self.__datamodel.errorqueue.updateErrorMsg(eventNumber, e.msg)
697 else:
698 # If event has suppressed object, eventNumber has already been
699 # purged from queue
700 self.__datamodel.errorqueue.remove(
701 eventNumber, ignoreMissingEventNumber=True
702 )
703 else:
704 if self.__datamodel.errorqueue.isEventAParentOfAnotherError(
705 localEvent, True
706 ):
707 __hermes__.logger.info(
708 f"Won't retry local event {localEvent} from error queue"
709 " as it is still a dependency of another error"
710 )
711 retryQueue.append(eventNumber)
712 continue
713 __hermes__.logger.info(
714 f"Retrying to process local event {localEvent} from error queue"
715 )
716 try:
717 self.__processLocalEvent(
718 remoteEvent, localEvent, enqueueEventWithError=False
719 )
720 except HermesClientHandlerError as e:
721 __hermes__.logger.info(
722 f"... failed on step {self.currentStep}: {str(e)}"
723 )
724 localEvent.step = self.currentStep
725 localEvent.isPartiallyProcessed = self.isPartiallyProcessed
726 self.__datamodel.errorqueue.updateErrorMsg(eventNumber, e.msg)
727 else:
728 # If event has suppressed object, eventNumber has already been
729 # purged from queue
730 self.__datamodel.errorqueue.remove(
731 eventNumber, ignoreMissingEventNumber=True
732 )
733 while self.__isPaused and not self.__isStopped:
734 sleep(1) # Allow loop to be paused
735 if self.__isStopped:
736 break # Allow loop to be interrupted if requested
737 else:
738 # Update __errorQueue_lastretry only if loop hasn't been interrupted
739 self.__errorQueue_lastretry = now
741 done = previousKeys == self.__datamodel.errorqueue.keys() or not retryQueue
742 if done:
743 if previousKeys:
744 __hermes__.logger.debug(
745 f"End of retryerrorqueue {previousKeys=}"
746 f" {self.__datamodel.errorqueue.keys()=} - {retryQueue=}"
747 )
748 else:
749 __hermes__.logger.debug(
750 "As some event have been processed, will retry ignored events"
751 f" {retryQueue}"
752 )
753 evNumbersToRetry = retryQueue.copy()
755 self.__isAnErrorRetry = False # End of __retryErrorQueue()
757 def __emptyTrashBin(self, force: bool = False):
758 # Enforce purgeInterval
759 now = datetime.now()
760 if (
761 not force
762 and now < self.__trashbin_lastpurge + self.__trashbin_purgeInterval
763 ):
764 return # Too early to process again
765 if self.__trashbin_retention is not None:
766 retentionLimit = datetime.now() - self.__trashbin_retention
768 objtype: str
769 objs: DataObjectList
770 # As we'll remove objects, process data in the datamodel reversed declaration
771 # order
772 for objtype, objs in reversed(self.__datamodel.remotedata.items()):
773 if not objtype.startswith("trashbin_"):
774 continue
776 for pkey in objs.getPKeys():
777 obj = objs.get(pkey)
778 if (
779 self.__trashbin_retention is None
780 or obj._trashbin_timestamp < retentionLimit
781 ):
782 event = Event(
783 evcategory="base", eventtype="removed", obj=obj, objattrs={}
784 )
785 __hermes__.logger.info(f"Trying to purge {repr(obj)} from trashbin")
786 if self.__datamodel.errorqueue.containsObjectByEvent(
787 event, isLocalEvent=False
788 ) and not self.__datamodel.errorqueue.isEventAParentOfAnotherError(
789 event, isLocalEvent=False
790 ):
791 try:
792 self.__processRemoteEvent(
793 event, local_event=None, enqueueEventWithError=False
794 )
795 except HermesClientHandlerError:
796 pass
797 else:
798 self.__processRemoteEvent(
799 event, local_event=None, enqueueEventWithError=True
800 )
802 while self.__isPaused and not self.__isStopped:
803 sleep(1) # Allow loop to be paused
804 if self.__isStopped:
805 break # Allow loop to be interrupted if requested
807 while self.__isPaused and not self.__isStopped:
808 sleep(1) # Allow loop to be paused
809 if self.__isStopped:
810 break # Allow loop to be interrupted if requested
811 else:
812 # Update __trashbin_lastpurge only if loop hasn't been interrupted
813 self.__trashbin_lastpurge = now
815 def __hasAlreadyBeenInitialized(self) -> bool:
816 if (
817 self.__cache.initstartoffset is None
818 or self.__cache.initstopoffset is None
819 or self.__cache.nextoffset is None
820 or self.__cache.nextoffset < self.__cache.initstopoffset
821 ):
822 return False
823 return True
825 def __hasAtLeastBeganInitialization(self) -> bool:
826 return (
827 self.__cache.initstartoffset is not None
828 and self.__datamodel.hasRemoteSchema()
829 )
831 def __canBeInitialized(self) -> bool:
832 self.__msgbus.seekToBeginning()
834 # List of (start, stop) offsets of initsync sequences found
835 initSyncFound: list[tuple[Any, Any]] = []
837 start = None
838 stop = None
839 event: Event
840 for event in self.__msgbus:
841 if event.evcategory != "initsync":
842 continue
843 if event.eventtype == "init-start":
844 start = event.offset
845 elif event.eventtype == "init-stop" and start is not None:
846 stop = event.offset
847 initSyncFound.append(
848 (start, stop),
849 )
850 if self.__useFirstInitsyncSequence:
851 break # We found the first sequence
852 else:
853 # Continue to find a new complete sequence
854 start = None
855 stop = None
857 if not initSyncFound:
858 return False
860 if self.__useFirstInitsyncSequence:
861 start, stop = initSyncFound[0]
862 else:
863 start, stop = initSyncFound[-1]
865 if self.__cache.nextoffset is None or self.__cache.nextoffset < start:
866 self.__cache.nextoffset = start
868 self.__cache.initstartoffset = start
869 self.__cache.initstopoffset = stop
870 __hermes__.logger.debug(
871 "Init sequence was found in Kafka at offsets"
872 f" [{self.__cache.initstartoffset} ; {self.__cache.initstopoffset}]"
873 )
874 return True
876 def __updateSchema(self, newSchema: Dataschema):
877 if self.__datamodel.forcePurgeOfTrashedObjectsWithoutNewPkeys(
878 self.__datamodel.remote_schema, newSchema
879 ):
880 self.__emptyTrashBin(force=True)
882 self.__datamodel.updateSchema(newSchema)
883 self.__config.savecachefile() # Save config to be able to rebuild datamodel
884 # Save and reload error queue to purge it from events of any suppressed types
885 self.__datamodel.saveErrorQueue()
886 self.__datamodel.loadErrorQueue()
887 self.__checkDatamodelWarnings()
889 def __checkDatamodelWarnings(self):
890 if self.__datamodel.unknownRemoteTypes:
891 __hermes__.logger.warning(
892 "Datamodel errors: remote types"
893 f" '{self.__datamodel.unknownRemoteTypes}'"
894 " don't exist in current Dataschema"
895 )
897 if self.__datamodel.unknownRemoteAttributes:
898 __hermes__.logger.warning(
899 "Datamodel errors: remote attributes don't exist in current"
900 f" Dataschema: {self.__datamodel.unknownRemoteAttributes}"
901 )
902 self.__notifyDatamodelWarnings()
904 def __processEvents(self, isInitSync=False):
905 remote_event: Event
906 schema: Dataschema | None = None
907 evcategory: str = "initsync" if isInitSync else "base"
909 self.__msgbus.seek(self.__cache.nextoffset)
911 for remote_event in self.__msgbus:
912 self.__saveRequired = True
913 if isInitSync and remote_event.offset > self.__cache.initstopoffset:
914 # Should never be called
915 self.__cache.nextoffset = remote_event.offset + 1
916 break
918 # TODO: implement data consistency check if event.evcategory==initsync
919 # and evcategory==base
921 if remote_event.evcategory != evcategory:
922 self.__cache.nextoffset = remote_event.offset + 1
923 continue
925 if isInitSync:
926 if remote_event.eventtype == "init-start":
927 schema = Dataschema.from_json(remote_event.objattrs)
928 self.__updateSchema(schema)
929 continue
931 if remote_event.eventtype == "init-stop":
932 self.__cache.nextoffset = remote_event.offset + 1
933 break
935 if schema is None and not self.__hasAtLeastBeganInitialization():
936 msg = "Invalid initsync sequence met, ignoring"
937 __hermes__.logger.critical(msg)
938 return
940 # Process "standard" message
941 match remote_event.eventtype:
942 case "added" | "modified" | "removed":
943 self.__processRemoteEvent(
944 remote_event, local_event=None, enqueueEventWithError=True
945 )
946 case "dataschema":
947 schema = Dataschema.from_json(remote_event.objattrs)
948 self.__updateSchema(schema)
949 case _:
950 __hermes__.logger.error(
951 "Received an event with unknown type"
952 f" '{remote_event.eventtype}': ignored"
953 )
955 self.__cache.nextoffset = remote_event.offset + 1
957 while self.__isPaused and not self.__isStopped:
958 sleep(1) # Allow loop to be paused
959 if self.__isStopped:
960 break # Allow loop to be interrupted if requested
962 def __processRemoteEvent(
963 self,
964 remote_event: Event | None,
965 local_event: Event | None,
966 enqueueEventWithError: bool,
967 simulateOnly: bool = False,
968 ):
969 secretAttrs = self.__datamodel.remote_schema.secretsAttributesOf(
970 remote_event.objtype
971 )
972 __hermes__.logger.debug(
973 f"__processRemoteEvent({remote_event.toString(secretAttrs)})"
974 )
975 self.__saveRequired = True
977 # In case of modification, try to compose full object in order to provide all
978 # attributes values, in order to render Jinja Template with several vars.
979 # In this specific case, one of the template var may have been modified,
980 # but not the other, so the event attrs are not enough to process the
981 # template rendering
982 r_obj_complete: DataObject | None = None
983 if remote_event.eventtype == "modified":
984 cache_complete, r_cachedobj_complete = (
985 Datamodel.getObjectFromCacheOrTrashbin(
986 self.__datamodel.remotedata_complete,
987 remote_event.objtype,
988 remote_event.objpkey,
989 )
990 )
991 if r_cachedobj_complete is not None:
992 r_obj_complete = Datamodel.getUpdatedObject(
993 r_cachedobj_complete, remote_event.objattrs
994 )
996 # Should be always None, except when called from __retryErrorQueue()
997 # In this case, we have to use the provided local_event, as it may contains
998 # some extra changes stacked by autoremediation
999 if local_event is None:
1000 local_events = self.__datamodel.convertEventToLocal(
1001 remote_event, r_obj_complete
1002 )
1003 else:
1004 local_events = [local_event]
1006 trashbin = self.__datamodel.remotedata[f"trashbin_{remote_event.objtype}"]
1007 was_in_trashbin = remote_event.objpkey in trashbin
1009 if not simulateOnly and enqueueEventWithError:
1010 hadErrors = self.__datamodel.errorqueue.containsObjectByEvent(
1011 remote_event, isLocalEvent=False
1012 )
1013 isParent = self.__datamodel.errorqueue.isEventAParentOfAnotherError(
1014 remote_event, isLocalEvent=False
1015 )
1017 for local_event in local_events:
1018 if not simulateOnly and enqueueEventWithError:
1019 if hadErrors or (
1020 isParent and remote_event.eventtype in self.__foreignkeys_events
1021 ):
1022 secretAttrs = self.__datamodel.remote_schema.secretsAttributesOf(
1023 remote_event.objtype
1024 )
1025 if hadErrors:
1026 errorMsg = (
1027 "Object in remote event"
1028 f" {remote_event.toString(secretAttrs)} already had"
1029 " unresolved errors: appending event to error queue"
1030 )
1031 else:
1032 errorMsg = (
1033 "Object in remote event"
1034 f" {remote_event.toString(secretAttrs)} is a dependency of"
1035 " an object that already had unresolved errors: appending"
1036 " event to error queue"
1037 )
1038 __hermes__.logger.warning(errorMsg)
1039 self.__processRemoteEvent(
1040 remote_event,
1041 local_event=None,
1042 enqueueEventWithError=False,
1043 simulateOnly=True,
1044 )
1045 if local_event is None:
1046 # Force empty event generation when local_event doesn't change
1047 # anything
1048 l_events = self.__datamodel.convertEventToLocal(
1049 remote_event, r_obj_complete, allowEmptyEvent=True
1050 )
1051 for local_event in l_events:
1052 self.__datamodel.errorqueue.append(
1053 remote_event, local_event, errorMsg
1054 )
1055 else:
1056 self.__datamodel.errorqueue.append(
1057 remote_event, local_event, errorMsg
1058 )
1059 continue
1061 try:
1062 match remote_event.eventtype:
1063 case "added":
1064 if self.__trashbin_retention is not None and was_in_trashbin:
1065 # Object is in trashbin, recycle it
1066 self.__remoteRecycled(
1067 remote_event, local_event, simulateOnly
1068 )
1069 else:
1070 # Add new object
1071 self.__remoteAdded(remote_event, local_event, simulateOnly)
1073 case "modified":
1074 self.__remoteModified(remote_event, local_event, simulateOnly)
1076 case "removed":
1077 # Remove object on any of these conditions:
1078 # - trashbin retention is disabled
1079 # - object is already in trashbin
1080 if self.__trashbin_retention is None or was_in_trashbin:
1081 # Remove object
1082 self.__remoteRemoved(
1083 remote_event, local_event, simulateOnly
1084 )
1085 else:
1086 # Store object in trashbin
1087 self.__remoteTrashed(
1088 remote_event, local_event, simulateOnly
1089 )
1090 except HermesClientHandlerError as e:
1091 if not simulateOnly and enqueueEventWithError:
1092 self.__processRemoteEvent(
1093 remote_event,
1094 local_event=None,
1095 enqueueEventWithError=False,
1096 simulateOnly=True,
1097 )
1098 remote_event.step = self.currentStep
1099 remote_event.isPartiallyProcessed = self.isPartiallyProcessed
1100 local_event.step = self.currentStep
1101 local_event.isPartiallyProcessed = self.isPartiallyProcessed
1103 if local_event is None:
1104 # Force empty event generation when local_event doesn't change
1105 # anything
1106 l_events = self.__datamodel.convertEventToLocal(
1107 remote_event, r_obj_complete, allowEmptyEvent=True
1108 )
1109 for local_event in l_events:
1110 self.__datamodel.errorqueue.append(
1111 remote_event, local_event, e.msg
1112 )
1113 else:
1114 self.__datamodel.errorqueue.append(
1115 remote_event, local_event, e.msg
1116 )
1117 else:
1118 raise
1120 def __processLocalEvent(
1121 self,
1122 remote_event: Event | None,
1123 local_event: Event | None,
1124 enqueueEventWithError: bool,
1125 simulateOnly: bool = False,
1126 ):
1127 if local_event is None:
1128 __hermes__.logger.debug("__processLocalEvent(None)")
1129 return
1131 secretAttrs = self.__datamodel.local_schema.secretsAttributesOf(
1132 local_event.objtype
1133 )
1134 __hermes__.logger.debug(
1135 f"__processLocalEvent({local_event.toString(secretAttrs)})"
1136 )
1138 self.__saveRequired = True
1140 if not simulateOnly:
1141 # Reset current step
1142 self.currentStep = local_event.step
1143 self.isPartiallyProcessed = local_event.isPartiallyProcessed
1145 if not simulateOnly and enqueueEventWithError:
1146 hadErrors = self.__datamodel.errorqueue.containsObjectByEvent(
1147 local_event, isLocalEvent=True
1148 )
1149 isParent = self.__datamodel.errorqueue.isEventAParentOfAnotherError(
1150 local_event, isLocalEvent=True
1151 )
1152 if hadErrors or (
1153 isParent and local_event.eventtype in self.__foreignkeys_events
1154 ):
1155 secretAttrs = self.__datamodel.local_schema.secretsAttributesOf(
1156 local_event.objtype
1157 )
1158 if hadErrors:
1159 errorMsg = (
1160 f"Object in local event {local_event.toString(secretAttrs)}"
1161 " already had unresolved errors: appending event to error queue"
1162 )
1163 else:
1164 errorMsg = (
1165 f"Object in local event {local_event.toString(secretAttrs)}"
1166 " is a dependency of an object that already had unresolved"
1167 " errors: appending event to error queue"
1168 )
1169 __hermes__.logger.warning(errorMsg)
1170 self.__processLocalEvent(
1171 None, local_event, enqueueEventWithError=False, simulateOnly=True
1172 )
1173 self.__datamodel.errorqueue.append(remote_event, local_event, errorMsg)
1174 return
1176 trashbin = self.__datamodel.localdata[f"trashbin_{local_event.objtype}"]
1177 try:
1178 match local_event.eventtype:
1179 case "added":
1180 if (
1181 self.__trashbin_retention is not None
1182 and local_event.objpkey in trashbin
1183 ):
1184 # Object is in trashbin, recycle it
1185 self.__localRecycled(local_event, simulateOnly)
1186 else:
1187 self.__localAdded(local_event, simulateOnly)
1188 case "modified":
1189 if not simulateOnly and local_event.objpkey in trashbin:
1190 # Object is in trashbin, and cannot be modified until it is
1191 # restored
1192 if enqueueEventWithError:
1193 # As the object changes will be processed at restore,
1194 # ignore the change
1195 self.__processLocalEvent(
1196 None,
1197 local_event,
1198 enqueueEventWithError=False,
1199 simulateOnly=True,
1200 )
1201 else:
1202 # Propagate error as requested
1203 raise HermesClientHandlerError(
1204 f"Object of event {repr(local_event)} is in trashbin,"
1205 " and cannot be modified until it is restored"
1206 )
1207 else:
1208 self.__localModified(local_event, simulateOnly)
1209 case "removed":
1210 # Remove object on any of these conditions:
1211 # - trashbin retention is disabled
1212 # - object is already in trashbin
1213 if (
1214 self.__trashbin_retention is None
1215 or local_event.objpkey in trashbin
1216 ):
1217 self.__localRemoved(local_event, simulateOnly)
1218 else:
1219 self.__localTrashed(local_event, simulateOnly)
1220 except HermesClientHandlerError as e:
1221 if not simulateOnly and enqueueEventWithError:
1222 self.__processLocalEvent(
1223 None, local_event, enqueueEventWithError=False, simulateOnly=True
1224 )
1225 if remote_event is not None:
1226 remote_event.step = self.currentStep
1227 remote_event.isPartiallyProcessed = self.isPartiallyProcessed
1228 local_event.step = self.currentStep
1229 local_event.isPartiallyProcessed = self.isPartiallyProcessed
1230 self.__datamodel.errorqueue.append(remote_event, local_event, e.msg)
1231 else:
1232 raise
1234 def __remoteAdded(
1235 self, remote_event: Event, local_event: Event, simulateOnly: bool = False
1236 ):
1237 secretAttrs = self.__datamodel.remote_schema.secretsAttributesOf(
1238 remote_event.objtype
1239 )
1240 __hermes__.logger.debug(f"__remoteAdded({remote_event.toString(secretAttrs)})")
1242 r_obj = self.__datamodel.createRemoteDataobject(
1243 remote_event.objtype, remote_event.objattrs
1244 )
1245 self.__processLocalEvent(
1246 remote_event,
1247 local_event,
1248 enqueueEventWithError=False,
1249 simulateOnly=simulateOnly,
1250 )
1252 # Add remote object to cache
1253 if not simulateOnly:
1254 # May already been added if remote type is used in more than one local type
1255 self.__datamodel.remotedata[remote_event.objtype].append(
1256 r_obj, ignoreIfAlreadyPresent=True
1257 )
1258 # May already been added if current event is from errorqueue
1259 self.__datamodel.remotedata_complete[remote_event.objtype].append(
1260 r_obj, ignoreIfAlreadyPresent=True
1261 )
1263 def __localAdded(self, local_ev: Event, simulateOnly: bool = False):
1264 secretAttrs = self.__datamodel.local_schema.secretsAttributesOf(
1265 local_ev.objtype
1266 )
1267 __hermes__.logger.debug(f"__localAdded({local_ev.toString(secretAttrs)})")
1269 l_obj = self.__datamodel.createLocalDataobject(
1270 local_ev.objtype, local_ev.objattrs
1271 )
1273 if not simulateOnly:
1274 # Call added handler
1275 self.__callHandler(
1276 objtype=local_ev.objtype,
1277 eventtype="added",
1278 objkey=local_ev.objpkey,
1279 eventattrs=local_ev.objattrs,
1280 newobj=deepcopy(l_obj),
1281 )
1283 # Add local object to cache
1284 if not simulateOnly:
1285 self.__datamodel.localdata[local_ev.objtype].append(
1286 l_obj, ignoreIfAlreadyPresent=True
1287 )
1288 # May already been added if current event is from errorqueue
1289 self.__datamodel.localdata_complete[local_ev.objtype].append(
1290 l_obj, ignoreIfAlreadyPresent=True
1291 )
1293 def __remoteRecycled(
1294 self, remote_event: Event, local_event: Event, simulateOnly: bool = False
1295 ):
1296 secretAttrs = self.__datamodel.remote_schema.secretsAttributesOf(
1297 remote_event.objtype
1298 )
1299 __hermes__.logger.debug(
1300 f"__remoteRecycled({remote_event.toString(secretAttrs)})"
1301 )
1302 maincache = self.__datamodel.remotedata[remote_event.objtype]
1303 maincache_complete = self.__datamodel.remotedata_complete[remote_event.objtype]
1304 trashbin = self.__datamodel.remotedata[f"trashbin_{remote_event.objtype}"]
1305 trashbin_complete = self.__datamodel.remotedata_complete[
1306 f"trashbin_{remote_event.objtype}"
1307 ]
1309 r_obj = self.__datamodel.createRemoteDataobject(
1310 remote_event.objtype, remote_event.objattrs
1311 )
1313 self.__processLocalEvent(
1314 remote_event,
1315 local_event,
1316 enqueueEventWithError=False,
1317 simulateOnly=simulateOnly,
1318 )
1320 # Remove remote object from trashbin
1321 if not simulateOnly:
1322 trashbin.removeByPkey(remote_event.objpkey)
1323 trashbin_complete.removeByPkey(remote_event.objpkey)
1324 # Restore remote object, with its potential changes, in main cache
1325 if not simulateOnly:
1326 # May already been added if remote type is used in more than one local type
1327 maincache.append(r_obj, ignoreIfAlreadyPresent=True)
1328 # May already been recycled if current event is from errorqueue
1329 maincache_complete.append(r_obj, ignoreIfAlreadyPresent=True)
1331 def __localRecycled(self, local_ev: Event, simulateOnly: bool = False):
1332 secretAttrs = self.__datamodel.local_schema.secretsAttributesOf(
1333 local_ev.objtype
1334 )
1335 __hermes__.logger.debug(f"__localRecycled({local_ev.toString(secretAttrs)})")
1336 maincache = self.__datamodel.localdata[local_ev.objtype]
1337 maincache_complete = self.__datamodel.localdata_complete[local_ev.objtype]
1338 trashbin = self.__datamodel.localdata[f"trashbin_{local_ev.objtype}"]
1339 trashbin_complete = self.__datamodel.localdata_complete[
1340 f"trashbin_{local_ev.objtype}"
1341 ]
1343 l_obj = self.__datamodel.createLocalDataobject(
1344 local_ev.objtype, local_ev.objattrs
1345 )
1346 l_obj_trash: DataObject = deepcopy(trashbin.get(local_ev.objpkey))
1347 del l_obj_trash._trashbin_timestamp # Remove trashbin timestamp from object
1349 l_obj_trash_complete: DataObject = deepcopy(
1350 trashbin_complete.get(local_ev.objpkey)
1351 )
1353 # May already been recycled if current event is from errorqueue
1354 if l_obj_trash_complete is not None:
1355 del (
1356 l_obj_trash_complete._trashbin_timestamp
1357 ) # Remove trashbin timestamp from object
1359 if not simulateOnly:
1360 # Call recycled handler
1361 self.__callHandler(
1362 objtype=local_ev.objtype,
1363 eventtype="recycled",
1364 objkey=l_obj_trash.getPKey(),
1365 eventattrs=l_obj_trash.toNative(),
1366 newobj=deepcopy(l_obj_trash),
1367 )
1369 if not simulateOnly:
1370 # Remove local object from trashbin
1371 trashbin.remove(l_obj_trash)
1372 # Restore local object in main cache
1373 maincache.append(l_obj_trash, ignoreIfAlreadyPresent=True)
1375 # May already been recycled if current event is from errorqueue
1376 if l_obj_trash_complete is not None:
1377 trashbin_complete.remove(l_obj_trash_complete)
1378 maincache_complete.append(l_obj_trash_complete, ignoreIfAlreadyPresent=True)
1380 diff = l_obj.diffFrom(l_obj_trash) # Handle local object changes if any
1381 if diff and not simulateOnly:
1382 event, obj = Event.fromDiffItem(
1383 diffitem=diff,
1384 eventCategory=local_ev.evcategory,
1385 changeType="modified",
1386 )
1387 # Hack: we pass this second event (modified) to error queue in order to
1388 # postpone its processing once all caches of previous one are up to date.
1389 # Otherwise, if an error is met on this second event (modified), we'll try
1390 # to reprocess the first one (recycled)
1391 self.__datamodel.errorqueue.append(
1392 remoteEvent=None, localEvent=event, errorMsg=None
1393 )
1394 # ... and force error queue to be retried asap in order to process the
1395 # pending event
1396 self.__errorQueue_lastretry = datetime(year=1, month=1, day=1)
1398 def __remoteModified(
1399 self, remote_event: Event, local_event: Event, simulateOnly: bool = False
1400 ):
1401 secretAttrs = self.__datamodel.remote_schema.secretsAttributesOf(
1402 remote_event.objtype
1403 )
1404 __hermes__.logger.debug(
1405 f"__remoteModified({remote_event.toString(secretAttrs)})"
1406 )
1407 maincache = self.__datamodel.remotedata[remote_event.objtype]
1409 if not simulateOnly:
1410 r_cachedobj: DataObject = maincache.get(remote_event.objpkey)
1411 r_obj = Datamodel.getUpdatedObject(r_cachedobj, remote_event.objattrs)
1413 cache_complete, r_cachedobj_complete = Datamodel.getObjectFromCacheOrTrashbin(
1414 self.__datamodel.remotedata_complete,
1415 remote_event.objtype,
1416 remote_event.objpkey,
1417 )
1418 r_obj_complete = Datamodel.getUpdatedObject(
1419 r_cachedobj_complete, remote_event.objattrs
1420 )
1422 self.__processLocalEvent(
1423 remote_event,
1424 local_event,
1425 enqueueEventWithError=False,
1426 simulateOnly=simulateOnly,
1427 )
1429 # Update remote object in cache
1430 if not simulateOnly:
1431 maincache.replace(r_obj)
1433 # May not exist
1434 if cache_complete is not None:
1435 cache_complete.replace(r_obj_complete)
1437 def __localModified(self, local_ev: Event, simulateOnly: bool = False):
1438 secretAttrs = self.__datamodel.local_schema.secretsAttributesOf(
1439 local_ev.objtype
1440 )
1441 __hermes__.logger.debug(f"__localModified({local_ev.toString(secretAttrs)})")
1442 maincache = self.__datamodel.localdata[local_ev.objtype]
1444 if not simulateOnly:
1445 l_cachedobj: DataObject = maincache.get(local_ev.objpkey)
1446 l_obj = Datamodel.getUpdatedObject(l_cachedobj, local_ev.objattrs)
1448 cache_complete, l_cachedobj_complete = Datamodel.getObjectFromCacheOrTrashbin(
1449 self.__datamodel.localdata_complete, local_ev.objtype, local_ev.objpkey
1450 )
1452 # May not exist
1453 if cache_complete is not None:
1454 l_obj_complete = Datamodel.getUpdatedObject(
1455 l_cachedobj_complete, local_ev.objattrs
1456 )
1458 if not simulateOnly:
1459 # Call modified handler
1460 self.__callHandler(
1461 objtype=local_ev.objtype,
1462 eventtype="modified",
1463 objkey=local_ev.objpkey,
1464 eventattrs=local_ev.objattrs,
1465 newobj=deepcopy(l_obj),
1466 cachedobj=deepcopy(l_cachedobj),
1467 )
1469 # Update local object in cache
1470 if not simulateOnly:
1471 maincache.replace(l_obj)
1473 # May not exist
1474 if cache_complete is not None:
1475 cache_complete.replace(l_obj_complete)
1477 def __remoteTrashed(
1478 self, remote_event: Event, local_event: Event, simulateOnly: bool = False
1479 ):
1480 secretAttrs = self.__datamodel.remote_schema.secretsAttributesOf(
1481 remote_event.objtype
1482 )
1483 __hermes__.logger.debug(
1484 f"__remoteTrashed({remote_event.toString(secretAttrs)})"
1485 )
1486 maincache = self.__datamodel.remotedata[remote_event.objtype]
1487 maincache_complete = self.__datamodel.remotedata_complete[remote_event.objtype]
1488 trashbin = self.__datamodel.remotedata[f"trashbin_{remote_event.objtype}"]
1489 trashbin_complete = self.__datamodel.remotedata_complete[
1490 f"trashbin_{remote_event.objtype}"
1491 ]
1493 r_cachedobj: DataObject = maincache.get(remote_event.objpkey)
1494 r_cachedobj_complete: DataObject = maincache_complete.get(remote_event.objpkey)
1496 self.__processLocalEvent(
1497 remote_event,
1498 local_event,
1499 enqueueEventWithError=False,
1500 simulateOnly=simulateOnly,
1501 )
1503 if not simulateOnly:
1504 # Remove remote object from cache
1505 # May already been removed if remote type is used in more than one local
1506 # type
1507 if r_cachedobj is not None:
1508 maincache.remove(r_cachedobj)
1509 r_cachedobj._trashbin_timestamp = remote_event.timestamp
1510 # Add remote object to trashbin
1511 # May already been added if remote type is used in more than one local
1512 # type
1513 trashbin.append(r_cachedobj, ignoreIfAlreadyPresent=True)
1515 if r_cachedobj_complete is not None:
1516 maincache_complete.remove(r_cachedobj_complete)
1517 r_cachedobj_complete._trashbin_timestamp = remote_event.timestamp
1518 # May already been added if remote type is used in more than one local type
1519 trashbin_complete.append(r_cachedobj_complete, ignoreIfAlreadyPresent=True)
1521 def __localTrashed(self, local_ev: Event, simulateOnly: bool = False):
1522 secretAttrs = self.__datamodel.local_schema.secretsAttributesOf(
1523 local_ev.objtype
1524 )
1525 __hermes__.logger.debug(f"__localTrashed({local_ev.toString(secretAttrs)})")
1526 maincache = self.__datamodel.localdata[local_ev.objtype]
1527 maincache_complete = self.__datamodel.localdata_complete[local_ev.objtype]
1528 trashbin = self.__datamodel.localdata[f"trashbin_{local_ev.objtype}"]
1529 trashbin_complete = self.__datamodel.localdata_complete[
1530 f"trashbin_{local_ev.objtype}"
1531 ]
1533 l_cachedobj: DataObject = maincache.get(local_ev.objpkey)
1534 l_cachedobj_complete: DataObject = maincache_complete.get(local_ev.objpkey)
1536 if not simulateOnly:
1537 # Call trashed handler
1538 self.__callHandler(
1539 objtype=local_ev.objtype,
1540 eventtype="trashed",
1541 objkey=local_ev.objpkey,
1542 eventattrs=local_ev.objattrs,
1543 cachedobj=deepcopy(l_cachedobj),
1544 )
1546 if not simulateOnly:
1547 # Remove local object from cache
1548 maincache.remove(l_cachedobj)
1549 l_cachedobj._trashbin_timestamp = local_ev.timestamp
1550 # Add local object to trashbin
1551 trashbin.append(l_cachedobj, ignoreIfAlreadyPresent=True)
1553 if l_cachedobj_complete is not None:
1554 maincache_complete.remove(l_cachedobj_complete)
1555 l_cachedobj_complete._trashbin_timestamp = local_ev.timestamp
1556 trashbin_complete.append(l_cachedobj_complete, ignoreIfAlreadyPresent=True)
1558 def __remoteRemoved(
1559 self, remote_event: Event, local_event: Event, simulateOnly: bool = False
1560 ):
1561 secretAttrs = self.__datamodel.remote_schema.secretsAttributesOf(
1562 remote_event.objtype
1563 )
1564 __hermes__.logger.debug(
1565 f"__remoteRemoved({remote_event.toString(secretAttrs)})"
1566 )
1568 cache, r_cachedobj = Datamodel.getObjectFromCacheOrTrashbin(
1569 self.__datamodel.remotedata,
1570 remote_event.objtype,
1571 remote_event.objpkey,
1572 )
1574 cache_complete, r_cachedobj_complete = Datamodel.getObjectFromCacheOrTrashbin(
1575 self.__datamodel.remotedata_complete,
1576 remote_event.objtype,
1577 remote_event.objpkey,
1578 )
1580 self.__processLocalEvent(
1581 remote_event,
1582 local_event,
1583 enqueueEventWithError=False,
1584 simulateOnly=simulateOnly,
1585 )
1587 # Remove remote object from cache or trashbin
1588 # May already been removed if remote type is used in more than one local type
1589 if not simulateOnly:
1590 if r_cachedobj is not None:
1591 cache.remove(r_cachedobj)
1593 # May already been removed if current event is from errorqueue
1594 if cache_complete is not None:
1595 # May already been removed if remote type is used in more than one local
1596 # type
1597 if r_cachedobj_complete is not None:
1598 cache_complete.remove(r_cachedobj_complete)
1600 def __localRemoved(self, local_ev: Event, simulateOnly: bool = False):
1601 secretAttrs = self.__datamodel.local_schema.secretsAttributesOf(
1602 local_ev.objtype
1603 )
1604 __hermes__.logger.debug(f"__localRemoved({local_ev.toString(secretAttrs)})")
1606 cache, l_cachedobj = Datamodel.getObjectFromCacheOrTrashbin(
1607 self.__datamodel.localdata,
1608 local_ev.objtype,
1609 local_ev.objpkey,
1610 )
1612 cache_complete, l_cachedobj_complete = Datamodel.getObjectFromCacheOrTrashbin(
1613 self.__datamodel.localdata_complete,
1614 local_ev.objtype,
1615 local_ev.objpkey,
1616 )
1618 if not simulateOnly:
1619 # Call removed handler
1620 self.__callHandler(
1621 objtype=local_ev.objtype,
1622 eventtype="removed",
1623 objkey=local_ev.objpkey,
1624 eventattrs=local_ev.objattrs,
1625 cachedobj=deepcopy(l_cachedobj),
1626 )
1628 # Remove local object from cache or trashbin
1629 if not simulateOnly:
1630 cache.remove(l_cachedobj)
1631 # May already been removed if current event is from errorqueue
1632 if cache_complete is not None:
1633 cache_complete.remove(l_cachedobj_complete)
1635 def __callHandler(self, objtype: str, eventtype: str, **kwargs):
1636 if not objtype:
1637 handlerName = f"on_{eventtype}"
1638 kwargs_filtered = kwargs
1639 else:
1640 handlerName = f"on_{objtype}_{eventtype}"
1642 # Filter secrets values
1643 secretAttrs = self.__datamodel.local_schema.secretsAttributesOf(objtype)
1644 kwargs_filtered = kwargs.copy()
1645 kwargs_filtered["eventattrs"] = Event.objattrsToString(
1646 kwargs["eventattrs"], secretAttrs
1647 )
1649 kwargsstr = ", ".join([f"{k}={repr(v)}" for k, v in kwargs_filtered.items()])
1651 hdlr = getattr(self, handlerName, None)
1653 if not callable(hdlr):
1654 __hermes__.logger.debug(
1655 f"Calling '{handlerName}({kwargsstr})': handler '{handlerName}()'"
1656 " doesn't exists"
1657 )
1658 return
1660 __hermes__.logger.info(
1661 f"Calling '{handlerName}({kwargsstr})' - currentStep={self.currentStep},"
1662 f" isPartiallyProcessed={self.isPartiallyProcessed},"
1663 f" isAnErrorRetry={self.isAnErrorRetry}"
1664 )
1666 try:
1667 hdlr(**kwargs)
1668 except Exception as e:
1669 __hermes__.logger.error(
1670 f"Calling '{handlerName}({kwargsstr})': error met on step"
1671 f" {self.currentStep} '{str(e)}'"
1672 )
1673 raise HermesClientHandlerError(e)
1675 def __processDatamodelUpdate(self):
1676 """Check difference between current datamodel and previous one. If datamodel has
1677 changed, generate local events according to datamodel changes"""
1679 diff = self.__newdatamodel.diffFrom(self.__datamodel)
1681 if not diff:
1682 __hermes__.logger.info("No change in datamodel")
1683 # Start working with new datamodel
1684 self.__datamodel = self.__newdatamodel
1685 self.__datamodel.loadErrorQueue()
1686 return
1688 __hermes__.logger.info(f"Datamodel has changed: {diff.dict}")
1689 self.__saveRequired = True
1691 # Start by removed types, as it requires the previous datamodel to process data
1692 # removal
1693 if diff.removed:
1694 for l_objtype in diff.removed:
1695 __hermes__.logger.info(f"About to purge data from type '{l_objtype}'")
1696 for ltypes in self.__datamodel.typesmapping.values():
1697 if l_objtype in ltypes:
1698 break
1699 else:
1700 # l_objtype was not found in each list from
1701 # self.__datamodel.typesmapping.values()
1702 __hermes__.logger.warning(
1703 f"Requested to purge data from type '{l_objtype}', but it"
1704 " doesn't exist in previous datamodel: ignoring"
1705 )
1706 continue
1708 pkeys = (
1709 self.__datamodel.localdata[l_objtype].getPKeys()
1710 | self.__datamodel.localdata[f"trashbin_{l_objtype}"].getPKeys()
1711 )
1713 # Call remove on each object
1714 for pkey in pkeys:
1715 _, l_obj = Datamodel.getObjectFromCacheOrTrashbin(
1716 self.__datamodel.localdata, l_objtype, pkey
1717 )
1718 if l_obj:
1719 l_ev = Event(
1720 evcategory="base",
1721 eventtype="removed",
1722 obj=l_obj,
1723 objattrs={},
1724 )
1725 secretAttrs = self.__datamodel.local_schema.secretsAttributesOf(
1726 l_objtype
1727 )
1728 __hermes__.logger.debug(
1729 f"Removing local object of {pkey=}:"
1730 f" {l_ev.toString(secretAttrs)=}"
1731 )
1732 self.__localRemoved(l_ev)
1733 else:
1734 __hermes__.logger.error(f"Local object of {pkey=} not found")
1736 # All objects have been removed, remove remaining events from
1737 # errorqueue, if any
1738 for l_obj in self.__datamodel.localdata_complete[l_objtype]:
1739 # Remove eventual events relative to current object from error queue
1740 self.__datamodel.errorqueue.purgeAllEventsOfDataObject(
1741 l_obj, isLocalObjtype=True
1742 )
1744 self.__datamodel.saveLocalAndRemoteData() # Save changes
1746 # Purge old remote and local cache files, if any
1747 __hermes__.logger.info(
1748 f"Types removed from Datamodel: {diff.removed}, purging cache files"
1749 )
1750 Datamodel.purgeOldCacheFiles(diff.removed, cacheFilePrefix="__")
1752 # Start working with new datamodel
1753 self.__datamodel = self.__newdatamodel
1754 self.__datamodel.loadLocalAndRemoteData()
1756 # Reload error queue to allow it to handle new datamodel
1757 self.__datamodel.saveErrorQueue()
1758 self.__datamodel.loadErrorQueue()
1760 if diff.added or diff.modified:
1761 # Generate diff events according to datamodel changes
1762 new_local_data: dict[str, DataObjectList] = {}
1764 # Work on "complete" copy of data cache, representing the cache that should
1765 # be without any event in error queue in order to compute diff on complete
1766 # cache representation
1767 completeRemoteData = self.__datamodel.remotedata_complete
1768 completeLocalData = self.__datamodel.localdata_complete
1770 # Loop over each remote type
1771 for r_objtype in self.__datamodel.remote_schema.objectTypes:
1772 # For each type, we'll work on classic data, and on trashbin data
1773 for prefix in ("", "trashbin_"):
1774 if r_objtype not in self.__datamodel.typesmapping:
1775 continue # Remote objtype isn't set in current Datamodel
1777 # Fetch corresponding local type
1778 for l_objt in self.__datamodel.typesmapping[r_objtype]:
1779 l_objtype = f"{prefix}{l_objt}"
1781 # Convert remote data cache to local data
1782 new_local_data[l_objtype] = (
1783 self.__datamodel.convertDataObjectListToLocal(
1784 r_objtype,
1785 completeRemoteData[f"{prefix}{r_objtype}"],
1786 l_objt,
1787 )
1788 )
1790 # Compute differences between new local data and local data
1791 # cache
1792 completeLocalDataObjtype: DataObjectList = (
1793 completeLocalData.get(l_objtype, DataObjectList([]))
1794 )
1795 datadiff = new_local_data[l_objtype].diffFrom(
1796 completeLocalDataObjtype
1797 )
1799 for changeType, difflist in datadiff.dict.items():
1800 diffitem: DiffObject | DataObject
1801 for diffitem in difflist:
1802 # Convert diffitem to local Event
1803 event, obj = Event.fromDiffItem(
1804 diffitem=diffitem,
1805 eventCategory="base",
1806 changeType=changeType,
1807 )
1809 if prefix == "trashbin_":
1810 if obj not in completeLocalDataObjtype:
1811 # Object exists in remote trashbin, but not in
1812 # local one as it has been removed before its
1813 # type was added to client's Datamodel.
1814 # Process a local "added" event, then a local
1815 # "removed" event to store local object in
1816 # trashbin
1818 # Add local object
1819 self.__processLocalEvent(
1820 None, event, enqueueEventWithError=True
1821 )
1823 # Prepare "removed" event
1824 event = Event(
1825 evcategory="base",
1826 eventtype="removed",
1827 obj=obj,
1828 objattrs={},
1829 )
1830 # Preserve object _trashbin_timestamp
1831 event.timestamp = completeRemoteData[
1832 f"{prefix}{r_objtype}"
1833 ][obj]._trashbin_timestamp
1834 else:
1835 # Preserve object _trashbin_timestamp
1836 obj._trashbin_timestamp = (
1837 completeLocalDataObjtype[
1838 obj
1839 ]._trashbin_timestamp
1840 )
1842 # Process Event and update cache if no error is met,
1843 # enqueue event otherwise
1844 self.__processLocalEvent(
1845 None, event, enqueueEventWithError=True
1846 )
1848 self.__config.savecachefile() # Save config to be able to rebuild datamodel
1849 self.__datamodel.saveLocalAndRemoteData() # Save data
1850 self.__checkDatamodelWarnings()
1852 def __status(
1853 self, verbose=False, level="information", ignoreUnhandledExceptions=False
1854 ) -> dict[str, dict[str, dict[str, Any]]]:
1855 """Returns a dict containing status for current client Datamodel and error
1856 queue.
1858 Each status contains 3 categories/levels: "information", "warning" and "error"
1859 """
1860 if level not in ("information", "warning", "error"):
1861 raise AttributeError(
1862 f"Specified level '{level}' is invalid. Possible values are"
1863 """("information", "warning", "error"):"""
1864 )
1866 appname: str = self.__config["appname"]
1868 match level:
1869 case "error":
1870 levels = ["error"]
1871 case "warning":
1872 levels = [
1873 "warning",
1874 "error",
1875 ]
1876 case "information":
1877 levels = [
1878 "information",
1879 "warning",
1880 "error",
1881 ]
1883 res = {
1884 appname: {
1885 "information": {
1886 "startTime": self.__startTime.strftime("%Y-%m-%d %H:%M:%S"),
1887 "status": "paused" if self.__isPaused else "running",
1888 "pausedSince": (
1889 self.__isPaused.strftime("%Y-%m-%d %H:%M:%S")
1890 if self.__isPaused
1891 else "None"
1892 ),
1893 },
1894 "warning": {},
1895 "error": {},
1896 },
1897 }
1898 if not ignoreUnhandledExceptions and self.__cache.exception:
1899 res[appname]["error"]["unhandledException"] = self.__cache.exception
1901 # Datamodel
1902 res["datamodel"] = {
1903 "information": {},
1904 "warning": {},
1905 "error": {},
1906 }
1907 if self.__datamodel.unknownRemoteTypes:
1908 res["datamodel"]["warning"]["unknownRemoteTypes"] = sorted(
1909 self.__datamodel.unknownRemoteTypes
1910 )
1911 if self.__datamodel.unknownRemoteAttributes:
1912 res["datamodel"]["warning"]["unknownRemoteAttributes"] = {
1913 k: sorted(v)
1914 for k, v in self.__datamodel.unknownRemoteAttributes.items()
1915 }
1917 # Error queue
1918 res["errorQueue"] = {
1919 "information": {},
1920 "warning": {},
1921 "error": {},
1922 }
1924 if self.__datamodel.errorqueue is not None:
1925 eventNumber: int
1926 remoteEvent: Event | None
1927 localEvent: Event
1928 errorMsg: str
1929 for (
1930 eventNumber,
1931 remoteEvent,
1932 localEvent,
1933 errorMsg,
1934 ) in self.__datamodel.errorqueue:
1935 if errorMsg is None:
1936 # Ignore the events in queue that are not errors
1937 continue
1939 # Always try to get object from local cache in order to use configured
1940 # toString template for obj repr()
1941 objtype = localEvent.objtype
1943 _, obj = Datamodel.getObjectFromCacheOrTrashbin(
1944 self.__datamodel.localdata_complete, objtype, localEvent.objpkey
1945 )
1946 if obj is None:
1947 _, obj = Datamodel.getObjectFromCacheOrTrashbin(
1948 self.__datamodel.localdata, objtype, localEvent.objpkey
1949 )
1950 if obj is None and remoteEvent is not None:
1951 _, obj = Datamodel.getObjectFromCacheOrTrashbin(
1952 self.__datamodel.remotedata_complete,
1953 remoteEvent.objtype,
1954 remoteEvent.objpkey,
1955 )
1956 if obj is None:
1957 obj = f"<{localEvent.objtype}[{localEvent.objpkey}]>"
1958 else:
1959 obj = repr(obj)
1961 res["errorQueue"]["error"][eventNumber] = {
1962 "objrepr": obj,
1963 "errorMsg": errorMsg,
1964 }
1965 if verbose:
1966 res["errorQueue"]["error"][eventNumber] |= {
1967 "objtype": localEvent.objtype,
1968 "objpkey": localEvent.objpkey,
1969 "objattrs": localEvent.objattrs,
1970 }
1972 # Clean empty categories
1973 for objname in list(res.keys()):
1974 for category in ("information", "warning", "error"):
1975 if category not in levels or (
1976 not verbose and not res[objname][category]
1977 ):
1978 del res[objname][category]
1980 if not verbose and not res[objname]:
1981 del res[objname]
1983 return res
1985 def __notifyQueueErrors(self):
1986 """Notify of any objects change in error queue"""
1987 new_error = self.__status(level="error", ignoreUnhandledExceptions=True)
1989 new_errors = {}
1990 if "errorQueue" in new_error:
1991 for errNumber, err in new_error["errorQueue"]["error"].items():
1992 new_errors[errNumber] = f"{err['objrepr']}: {err['errorMsg']}"
1994 new_errstr = json.dumps(
1995 new_errors,
1996 cls=JSONEncoder,
1997 indent=4,
1998 )
1999 old_errstr = json.dumps(self.__cache.queueErrors, cls=JSONEncoder, indent=4)
2001 if new_errstr != old_errstr:
2002 if new_errors:
2003 desc = "objects in error queue have changed"
2004 else:
2005 desc = "no more objects in error queue"
2007 __hermes__.logger.info(desc)
2008 Email.sendDiff(
2009 config=self.__config,
2010 contentdesc=desc,
2011 previous=old_errstr,
2012 current=new_errstr,
2013 )
2014 self.__cache.queueErrors = new_errors
2016 def __notifyDatamodelWarnings(self):
2017 """Notify of any data model warnings changes"""
2018 new_errors = self.__status(level="warning", ignoreUnhandledExceptions=True)
2020 if "datamodel" not in new_errors:
2021 new_errors = {}
2022 else:
2023 new_errors = new_errors["datamodel"]
2025 new_errstr = json.dumps(
2026 new_errors,
2027 cls=JSONEncoder,
2028 indent=4,
2029 )
2030 old_errstr = json.dumps(
2031 self.__cache.datamodelWarnings, cls=JSONEncoder, indent=4
2032 )
2034 if new_errors:
2035 __hermes__.logger.error("Datamodel has warnings:\n" + new_errstr)
2037 if new_errstr != old_errstr:
2038 if new_errors:
2039 desc = "datamodel warnings have changed"
2040 else:
2041 desc = "no more datamodel warnings"
2043 __hermes__.logger.info(desc)
2044 Email.sendDiff(
2045 config=self.__config,
2046 contentdesc=desc,
2047 previous=old_errstr,
2048 current=new_errstr,
2049 )
2050 self.__cache.datamodelWarnings = new_errors
2052 def __notifyException(self, trace: str | None):
2053 """Notify of any unhandled exception met/solved"""
2054 if trace:
2055 __hermes__.logger.critical(f"Unhandled exception: {trace}")
2057 if self.__cache.exception != trace:
2058 if trace:
2059 desc = "unhandled exception"
2060 else:
2061 desc = "no more unhandled exception"
2063 __hermes__.logger.info(desc)
2064 previous = "" if self.__cache.exception is None else self.__cache.exception
2065 current = "" if trace is None else trace
2066 Email.sendDiff(
2067 config=self.__config,
2068 contentdesc=desc,
2069 previous=previous,
2070 current=current,
2071 )
2072 self.__cache.exception = trace
2074 def __notifyFatalException(self, trace: str):
2075 """Notify of any fatal exception met before, and terminate app"""
2076 self.__isStopped = True
2078 desc = "Unhandled fatal exception, APP WILL TERMINATE IMMEDIATELY"
2079 NL = "\n"
2081 __hermes__.logger.critical(f"{desc}: {trace}")
2083 Email.send(
2084 config=self.__config,
2085 subject=f"[{self.__config['appname']}] {desc}",
2086 content=f"{desc}:{NL}{NL}{trace}",
2087 )