Coverage for clients/__init__.py: 83%
882 statements
« prev ^ index » next coverage.py v7.10.1, created at 2025-07-28 07:24 +0000
« prev ^ index » next coverage.py v7.10.1, created at 2025-07-28 07:24 +0000
1#!/usr/bin/env python3
2# -*- coding: utf-8 -*-
4# Hermes : Change Data Capture (CDC) tool from any source(s) to any target
5# Copyright (C) 2023, 2024 INSA Strasbourg
6#
7# This file is part of Hermes.
8#
9# Hermes is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# Hermes is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with Hermes. If not, see <https://www.gnu.org/licenses/>.
23from clients.datamodel import Datamodel, InvalidDataError
24from lib.config import HermesConfig
25from lib.version import HERMES_VERSION
26from lib.datamodel.dataobject import DataObject
27from lib.datamodel.dataobjectlist import DataObjectList
28from lib.datamodel.datasource import Dataschema, Datasource
29from lib.datamodel.diffobject import DiffObject
30from lib.datamodel.event import Event
31from lib.datamodel.serialization import LocalCache, JSONEncoder
32from lib.plugins import AbstractMessageBusConsumerPlugin
33from lib.utils.mail import Email
34from lib.utils.socket import (
35 SockServer,
36 SocketMessageToServer,
37 SocketMessageToClient,
38 SocketArgumentParser,
39 SocketParsingError,
40 SocketParsingMessage,
41)
43from copy import deepcopy
44from datetime import datetime, timedelta
45from time import sleep
46from types import FrameType
47from typing import Any
48import argparse
49import json
50import signal
51import traceback
54class HermesAlreadyNotifiedException(Exception):
55 """Raised when an exception has already been notified, to avoid a second
56 notification"""
59class HermesClientHandlerError(Exception):
60 """Raised when an exception is met during client handler call"""
62 def __init__(self, err: Exception | str | None):
63 if isinstance(err, Exception):
64 self.msg: str | None = HermesClientHandlerError.exceptionToString(err)
65 """Printable error message, None in a rare case where exception is raised
66 without error, but to postpone an event processing"""
67 else:
68 self.msg = err
69 super().__init__(self.msg)
71 @staticmethod
72 def exceptionToString(
73 exception: Exception, purgeCurrentFileFromTrace: bool = True
74 ) -> str:
75 """Convert the specified exception to a string containing its full trace"""
76 lines = traceback.format_exception(
77 type(exception), exception, exception.__traceback__
78 )
80 if purgeCurrentFileFromTrace:
81 # Purging current file infos from traceback
82 lines = [line for line in lines if __file__ not in line]
84 return "".join(lines).strip()
87class HermesClientCache(LocalCache):
88 """Hermes client data to cache"""
90 def __init__(self, from_json_dict: dict[str, Any] = {}):
91 super().__init__(
92 jsondataattr=[
93 "queueErrors",
94 "datamodelWarnings",
95 "exception",
96 "initstartoffset",
97 "initstopoffset",
98 "nextoffset",
99 ],
100 )
102 self.queueErrors: dict[str, str] = from_json_dict.get("queueErrors", {})
103 """Dictionary containing current objects in error queue, for notifications"""
105 self.datamodelWarnings: dict[str, dict[str, dict[str, Any]]] = (
106 from_json_dict.get("datamodelWarnings", {})
107 )
108 """Dictionary containing current datamodel warnings, for notifications"""
110 self.exception: str | None = from_json_dict.get("exception")
111 """String containing latest exception trace"""
113 self.initstartoffset: Any | None = from_json_dict.get("initstartoffset")
114 """Contains the offset of the first message of initSync sequence on message
115 bus"""
116 self.initstopoffset: Any | None = from_json_dict.get("initstopoffset")
117 """Contains the offset of the last message of initSync sequence on message
118 bus"""
119 self.nextoffset: Any | None = from_json_dict.get("nextoffset")
120 """Contains the offset of the next message to process on message bus"""
122 def savecachefile(self, cacheFilename: str | None = None):
123 """Override method only to disable backup files in cache"""
124 return super().savecachefile(cacheFilename, dontKeepBackup=True)
127class GenericClient:
128 """Superclass of all hermes-client implementations.
129 Manage all the internals of hermes for a client: datamodel updates, caching, error
130 management, trashbin and converting messages from message bus into events handlers
131 calls"""
133 __FOREIGNKEYS_POLICIES: dict[str, tuple[str]] = {
134 "disabled": tuple(),
135 "on_remove_event": ("removed",),
136 "on_every_event": ("added", "modified", "removed"),
137 }
138 """Different foreignkeys_policy settings : associate each foreignkeys_policy
139 (as key) with the list of event types that will be placed in the error queue if the
140 object concerning them is the parent (by foreign key) of an object already present
141 in the error queue"""
143 def __init__(self, config: HermesConfig):
144 """Instantiate a new client"""
146 __hermes__.logger.info(f"Starting {config['appname']} v{HERMES_VERSION}")
148 # Setup the signals handler
149 config.setSignalsHandler(self.__signalHandler)
151 self.__config: HermesConfig = config
152 """Current config"""
153 try:
154 self.config: dict[str, Any] = self.__config[self.__config["appname"]]
155 """Dict containing the client plugin configuration"""
156 except KeyError:
157 self.config = {}
159 self.__previousconfig: HermesConfig = HermesConfig.loadcachefile(
160 "_hermesconfig"
161 )
162 """Previous config (from cache)"""
164 self.__msgbus: AbstractMessageBusConsumerPlugin = self.__config["hermes"][
165 "plugins"
166 ]["messagebus"]["plugininstance"]
167 self.__msgbus.setTimeout(
168 self.__config["hermes-client"]["updateInterval"] * 1000
169 )
171 self.__cache: HermesClientCache = HermesClientCache.loadcachefile(
172 f"_{self.__config['appname']}"
173 )
174 """Cached attributes"""
175 self.__cache.setCacheFilename(f"_{self.__config['appname']}")
177 self.__startTime: datetime | None = None
178 """Datetime when mainloop was started"""
180 self.__isPaused: datetime | None = None
181 """Contains pause datetime if standard processing is paused, None otherwise"""
183 self.__isStopped: bool = False
184 """mainloop() will run until this var is set to True"""
186 self.__numberOfLoopToProcess: int | None = None
187 """**For functionnal tests only**, if a value is set, will process for *value*
188 iterations of mainloop and pause execution until a new positive value is set"""
190 self.__sock: SockServer | None = None
191 """Facultative socket to allow cli communication"""
192 if (
193 self.__config["hermes"]["cli_socket"]["path"] is not None
194 or self.__config["hermes"]["cli_socket"]["dont_manage_sockfile"] is not None
195 ):
196 self.__sock = SockServer(
197 path=self.__config["hermes"]["cli_socket"]["path"],
198 owner=self.__config["hermes"]["cli_socket"]["owner"],
199 group=self.__config["hermes"]["cli_socket"]["group"],
200 mode=self.__config["hermes"]["cli_socket"]["mode"],
201 processHdlr=self.__processSocketMessage,
202 dontManageSockfile=self.__config["hermes"]["cli_socket"][
203 "dont_manage_sockfile"
204 ],
205 )
206 self.__setupSocketParser()
208 self.__useFirstInitsyncSequence: bool = self.__config["hermes-client"][
209 "useFirstInitsyncSequence"
210 ]
211 """Indicate if we prefer using the first/oldest or last/most recent
212 initsync sequence available on message bus"""
214 self.__newdatamodel: Datamodel = Datamodel(config=self.__config)
215 """New datamodel (from current config)"""
217 if self.__previousconfig.hasData():
218 # Start app with previous datamodel to be able to check differences with
219 # new one through self.__processDatamodelUpdate()
220 self.__datamodel: Datamodel = Datamodel(config=self.__previousconfig)
221 """Current datamodel"""
222 else:
223 # As no previous datamodel is available, start app with new one
224 self.__datamodel = self.__newdatamodel
226 self.__trashbin_retention: timedelta | None = None
227 """Timedelta with delay to keep removed data in trashbin before permanently
228 deleting it. 'None' means no trashbin"""
229 if self.__config["hermes-client"]["trashbin_retention"] > 0:
230 self.__trashbin_retention = timedelta(
231 days=self.__config["hermes-client"]["trashbin_retention"]
232 )
234 self.__trashbin_purgeInterval: timedelta | None = timedelta(
235 minutes=self.__config["hermes-client"]["trashbin_purgeInterval"]
236 )
237 """Timedelta with delay between two trashbin purge attempts"""
239 self.__trashbin_lastpurge: datetime = datetime(year=1, month=1, day=1)
240 """Datetime when latest trashbin purge was ran"""
242 self.__errorQueue_retryInterval: timedelta | None = timedelta(
243 minutes=self.__config["hermes-client"]["errorQueue_retryInterval"]
244 )
245 """Timedelta with delay between two attempts of processing events in error"""
247 self.__errorQueue_lastretry: datetime = datetime(year=1, month=1, day=1)
248 """Datetime when latest error queue retry was ran"""
250 self.__currentStep: int = 0
251 """Store the step number of current event processing. Will be stored in events
252 in error queue to allow clients to resume an event where it has failed"""
254 self.__isPartiallyProcessed: bool = False
255 """Store if some data has been processed during current event processing. Will
256 be stored in events in error queue to handle autoremediation properly"""
258 self.__isAnErrorRetry: bool = False
259 """Indicate to handler whether the current event is being processed as part of
260 an error retry"""
262 self.__saveRequired: bool = False
263 """Reset to False at each loop start, and if any change is made during
264 processing, set to True in order to save all cache at the loop end.
265 Used to avoid expensive .save() calls when unnecessary
266 """
268 self.__foreignkeys_events: tuple[str] = self.__FOREIGNKEYS_POLICIES[
269 self.__config["hermes-client"]["foreignkeys_policy"]
270 ]
271 """List of event types that will be placed in the error queue if the object
272 concerning them is the parent (by foreign key) of an object already present in
273 the error queue"""
275 def getObjectFromCache(self, objtype: str, objpkey: Any) -> DataObject:
276 """Returns a deepcopy of an object from cache.
277 Raise IndexError if objtype is invalid, or if objpkey is not found
278 """
279 ds: Datasource = self.__datamodel.localdata
281 (_, obj) = Datamodel.getObjectFromCacheOrTrashbin(ds, objtype, objpkey)
282 if obj is None:
283 raise IndexError(
284 f"No object of {objtype=} with {objpkey=} was found in cache"
285 )
287 return deepcopy(obj)
289 def getDataobjectlistFromCache(self, objtype: str) -> DataObjectList:
290 """Returns cache of specified objtype, by reference.
291 WARNING: Any modification of the cache content will mess up your client !!!
292 Raise IndexError if objtype is invalid
293 """
294 ds: Datasource = self.__datamodel.localdata
295 cache = ds[objtype]
296 trashbin = ds[f"trashbin_{objtype}"]
298 # Create an empty DataObjectList of same type as cache
299 res = type(cache)(objlist=[])
301 res.extend(cache)
302 res.extend(trashbin)
303 return res
305 def __signalHandler(self, signalnumber: int, frame: FrameType | None):
306 """Signal handler that will be called on SIGINT and SIGTERM"""
307 __hermes__.logger.critical(
308 f"Signal '{signal.strsignal(signalnumber)}' received, terminating"
309 )
310 self.__isStopped = True
312 def __setupSocketParser(self):
313 """Set up the argparse context for unix socket commands"""
314 self.__parser = SocketArgumentParser(
315 prog=f"{self.__config['appname']}-cli",
316 description=f"Hermes client {self.__config['appname']} CLI",
317 exit_on_error=False,
318 )
320 subparsers = self.__parser.add_subparsers(help="Sub-commands")
322 # Quit
323 sp_quit = subparsers.add_parser("quit", help=f"Stop {self.__config['appname']}")
324 sp_quit.set_defaults(func=self.__sock_quit)
326 # Pause
327 sp_pause = subparsers.add_parser(
328 "pause", help="Pause processing until 'resume' command is sent"
329 )
330 sp_pause.set_defaults(func=self.__sock_pause)
332 # Resume
333 sp_resume = subparsers.add_parser(
334 "resume", help="Resume processing that has been paused with 'pause'"
335 )
336 sp_resume.set_defaults(func=self.__sock_resume)
338 # Status
339 sp_status = subparsers.add_parser(
340 "status", help=f"Show {self.__config['appname']} status"
341 )
342 sp_status.set_defaults(func=self.__sock_status)
343 sp_status.add_argument(
344 "-j",
345 "--json",
346 action="store_const",
347 const=True,
348 default=False,
349 help="Print status as json",
350 )
351 sp_status.add_argument(
352 "-v",
353 "--verbose",
354 action="store_const",
355 const=True,
356 default=False,
357 help="Output items without values",
358 )
360 def __processSocketMessage(
361 self, msg: SocketMessageToServer
362 ) -> SocketMessageToClient:
363 """Handler that process specified msg received on unix socket and returns the
364 answer to send"""
365 reply: SocketMessageToClient | None = None
367 try:
368 args = self.__parser.parse_args(msg.argv)
369 if "func" not in args:
370 raise SocketParsingMessage(self.__parser.format_help())
371 except (SocketParsingError, SocketParsingMessage) as e:
372 retmsg = str(e)
373 except argparse.ArgumentError as e:
374 retmsg = self.__parser.format_error(str(e))
375 else:
376 try:
377 reply = args.func(args)
378 except Exception as e:
379 lines = traceback.format_exception(type(e), e, e.__traceback__)
380 trace = "".join(lines).strip()
381 __hermes__.logger.critical(f"Unhandled exception: {trace}")
382 retmsg = trace
384 if reply is None: # Error was met
385 reply = SocketMessageToClient(retcode=1, retmsg=retmsg)
387 return reply
389 def __sock_quit(self, args: argparse.Namespace) -> SocketMessageToClient:
390 """Handler called when quit subcommand is requested on unix socket"""
391 self.__isStopped = True
392 __hermes__.logger.info(f"{self.__config['appname']} has been requested to quit")
393 return SocketMessageToClient(retcode=0, retmsg="")
395 def __sock_pause(self, args: argparse.Namespace) -> SocketMessageToClient:
396 """Handler called when pause subcommand is requested on unix socket"""
397 if self.__isStopped:
398 return SocketMessageToClient(
399 retcode=1,
400 retmsg=f"Error: {self.__config['appname']} is currently being stopped",
401 )
403 if self.__isPaused:
404 return SocketMessageToClient(
405 retcode=1,
406 retmsg=f"Error: {self.__config['appname']} is already paused",
407 )
409 __hermes__.logger.info(
410 f"{self.__config['appname']} has been requested to pause"
411 )
412 self.__isPaused = datetime.now()
413 return SocketMessageToClient(retcode=0, retmsg="")
415 def __sock_resume(self, args: argparse.Namespace) -> SocketMessageToClient:
416 """Handler called when resume subcommand is requested on unix socket"""
417 if self.__isStopped:
418 return SocketMessageToClient(
419 retcode=1,
420 retmsg=f"Error: {self.__config['appname']} is currently being stopped",
421 )
423 if not self.__isPaused:
424 return SocketMessageToClient(
425 retcode=1, retmsg=f"Error: {self.__config['appname']} is not paused"
426 )
428 __hermes__.logger.info(
429 f"{self.__config['appname']} has been requested to resume"
430 )
431 self.__isPaused = None
432 return SocketMessageToClient(retcode=0, retmsg="")
434 def __sock_status(self, args: argparse.Namespace) -> SocketMessageToClient:
435 """Handler called when status subcommand is requested on unix socket"""
436 status = self.__status(verbose=args.verbose)
437 if args.json:
438 msg = json.dumps(status, cls=JSONEncoder, indent=4)
439 else:
440 nl = "\n"
441 info2printable = {}
442 msg = ""
443 for objname in [self.__config["appname"]] + list(
444 status.keys() - (self.__config["appname"],)
445 ):
446 infos = status[objname]
447 msg += f"{objname}:{nl}"
448 for category in ("information", "warning", "error"):
449 if category not in infos:
450 continue
451 if not infos[category]:
452 msg += f" * {category.capitalize()}: []{nl}"
453 continue
455 msg += f" * {category.capitalize()}{nl}"
456 for infoname, infodata in infos[category].items():
457 indentedinfodata = str(infodata).replace("\n", "\n ")
458 msg += (
459 f" - {info2printable.get(infoname, infoname)}:"
460 f" {indentedinfodata}{nl}"
461 )
462 msg = msg.rstrip()
464 return SocketMessageToClient(retcode=0, retmsg=msg)
466 @property
467 def currentStep(self) -> int:
468 """Step number of current event processed.
469 Allow clients to resume an event where it has failed"""
470 return self.__currentStep
472 @currentStep.setter
473 def currentStep(self, value: int):
474 if type(value) is not int:
475 raise TypeError(
476 f"Specified step {value=} has invalid type '{type(value)}'"
477 " instead of int"
478 )
480 if value < 0:
481 raise ValueError(f"Specified step {value=} must be greater or equal to 0")
483 self.__currentStep = value
485 @property
486 def isPartiallyProcessed(self) -> bool:
487 """Indicate if some data has been processed during current event processing.
488 Required to handle autoremediation properly"""
489 return self.__isPartiallyProcessed
491 @isPartiallyProcessed.setter
492 def isPartiallyProcessed(self, value: bool):
493 if type(value) is not bool:
494 raise TypeError(
495 f"Specified isPartiallyProcessed {value=} has invalid type"
496 f" '{type(value)}' instead of bool"
497 )
499 self.__isPartiallyProcessed = value
501 @property
502 def isAnErrorRetry(self) -> bool:
503 """Read-only attribute that indicates to handler whether the current event is
504 being processed as part of an error retry"""
505 return self.__isAnErrorRetry
507 def mainLoop(self):
508 """Client main loop"""
509 self.__startTime = datetime.now()
511 self.__checkDatamodelWarnings()
512 # TODO: implement a check to ensure subclasses required data types and
513 # attributes exist in datamodel
515 if self.__datamodel.hasRemoteSchema():
516 __hermes__.logger.debug(
517 "Remote Dataschema in cache:"
518 f" {self.__datamodel.remote_schema.to_json()=}"
519 )
520 self.__datamodel.loadErrorQueue()
521 else:
522 __hermes__.logger.debug("No remote Dataschema in cache yet")
524 if self.__sock is not None:
525 self.__sock.startProcessMessagesDaemon(appname=__hermes__.appname)
527 # Reduce sleep duration during functional tests to speed them up
528 sleepDuration = 1 if self.__numberOfLoopToProcess is None else 0.05
530 isFirstLoopIteration: bool = True
531 while not self.__isStopped:
532 self.__saveRequired = False
534 try:
535 with self.__msgbus:
536 if self.__isPaused or self.__numberOfLoopToProcess == 0:
537 sleep(sleepDuration)
538 continue
540 try:
541 if self.__hasAlreadyBeenInitialized():
542 if isFirstLoopIteration:
543 try:
544 self.__processDatamodelUpdate()
545 except Exception as e:
546 self.__notifyFatalException(
547 HermesClientHandlerError.exceptionToString(
548 e, purgeCurrentFileFromTrace=False
549 )
550 )
551 raise HermesAlreadyNotifiedException
552 self.__retryErrorQueue()
553 self.__emptyTrashBin()
554 self.__processEvents(isInitSync=False)
555 else:
556 __hermes__.logger.info(
557 "Client hasn't ran its first initsync sequence yet"
558 )
559 if self.__canBeInitialized():
560 __hermes__.logger.info(
561 "First initsync sequence processing begins"
562 )
563 self.__processEvents(isInitSync=True)
564 if self.__hasAlreadyBeenInitialized():
565 __hermes__.logger.info(
566 "First initsync sequence processing completed"
567 )
568 else:
569 __hermes__.logger.info(
570 "No initsync sequence is available on message bus."
571 " Retry..."
572 )
574 self.__notifyException(None)
576 except HermesAlreadyNotifiedException:
577 pass
578 except InvalidDataError as e:
579 self.__notifyFatalException(
580 HermesClientHandlerError.exceptionToString(
581 e, purgeCurrentFileFromTrace=False
582 )
583 )
584 except Exception as e:
585 self.__notifyException(
586 HermesClientHandlerError.exceptionToString(
587 e, purgeCurrentFileFromTrace=False
588 )
589 )
590 finally:
591 isFirstLoopIteration = False
592 # Still could be True if an exception was raised in
593 # __retryErrorQueue()
594 self.__isAnErrorRetry = False
595 except Exception as e:
596 __hermes__.logger.warning(
597 "Message bus seems to be unavailable."
598 " Waiting 60 seconds before retrying"
599 )
600 self.__notifyException(
601 HermesClientHandlerError.exceptionToString(
602 e, purgeCurrentFileFromTrace=False
603 )
604 )
605 # Wait one second 60 times to avoid waiting too long before stopping
606 for i in range(60):
607 if self.__isStopped:
608 break
609 sleep(1)
610 finally:
611 if self.__saveRequired or self.__isStopped:
612 self.__datamodel.saveErrorQueue()
613 if self.__hasAtLeastBeganInitialization():
614 self.__datamodel.saveLocalAndRemoteData()
615 # Reset current step for "on_save" event
616 self.currentStep = 0
617 self.isPartiallyProcessed = False
618 # Call special event "on_save()"
619 self.__callHandler("", "save")
620 self.__notifyQueueErrors()
621 self.__cache.savecachefile()
623 if self.__isStopped:
624 # Only to ensure cache files version update is saved,
625 # to avoid version migrations at each restart,
626 # as those files aren't expected to be updated often
627 self.__config.savecachefile()
628 self.__datamodel.remote_schema.savecachefile()
630 # Only used in functionnal tests
631 if self.__numberOfLoopToProcess:
632 self.__numberOfLoopToProcess -= 1
634 def __retryErrorQueue(self):
635 # Enforce retryInterval
636 now = datetime.now()
637 if now < self.__errorQueue_lastretry + self.__errorQueue_retryInterval:
638 return # Too early to process again
640 # All events processed in __retryErrorQueue() require this attribute to be True
641 self.__isAnErrorRetry = True
643 done = False
644 evNumbersToRetry: list[int] = []
645 eventNumber: int
646 localEvent: Event
647 remoteEvent: Event | None
648 while not done:
649 retryQueue: list[int] = []
650 previousKeys = self.__datamodel.errorqueue.keys()
652 for (
653 eventNumber,
654 remoteEvent,
655 localEvent,
656 errorMsg,
657 ) in self.__datamodel.errorqueue:
658 if evNumbersToRetry and eventNumber not in evNumbersToRetry:
659 # Ignore eventNumber absent from evNumbersToRetry,
660 # excepted on first iteration of while loop
661 continue
663 if remoteEvent is not None:
664 if self.__datamodel.errorqueue.isEventAParentOfAnotherError(
665 remoteEvent, False
666 ):
667 __hermes__.logger.info(
668 f"Won't retry remote event {remoteEvent} from error queue"
669 " as it is still a dependency of another error"
670 )
671 retryQueue.append(eventNumber)
672 continue
674 __hermes__.logger.info(
675 f"Retrying to process remote event {remoteEvent} from error"
676 " queue"
677 )
678 try:
679 self.__processRemoteEvent(
680 remoteEvent, localEvent, enqueueEventWithError=False
681 )
682 except HermesClientHandlerError as e:
683 __hermes__.logger.info(
684 f"... failed on step {self.currentStep}: {str(e)}"
685 )
686 remoteEvent.step = self.currentStep
687 remoteEvent.isPartiallyProcessed = self.isPartiallyProcessed
688 localEvent.step = self.currentStep
689 localEvent.isPartiallyProcessed = self.isPartiallyProcessed
690 self.__datamodel.errorqueue.updateErrorMsg(eventNumber, e.msg)
691 else:
692 # If event has suppressed object, eventNumber has already been
693 # purged from queue
694 self.__datamodel.errorqueue.remove(
695 eventNumber, ignoreMissingEventNumber=True
696 )
697 else:
698 if self.__datamodel.errorqueue.isEventAParentOfAnotherError(
699 localEvent, True
700 ):
701 __hermes__.logger.info(
702 f"Won't retry local event {localEvent} from error queue"
703 " as it is still a dependency of another error"
704 )
705 retryQueue.append(eventNumber)
706 continue
707 __hermes__.logger.info(
708 f"Retrying to process local event {localEvent} from error queue"
709 )
710 try:
711 self.__processLocalEvent(
712 remoteEvent, localEvent, enqueueEventWithError=False
713 )
714 except HermesClientHandlerError as e:
715 __hermes__.logger.info(
716 f"... failed on step {self.currentStep}: {str(e)}"
717 )
718 localEvent.step = self.currentStep
719 localEvent.isPartiallyProcessed = self.isPartiallyProcessed
720 self.__datamodel.errorqueue.updateErrorMsg(eventNumber, e.msg)
721 else:
722 # If event has suppressed object, eventNumber has already been
723 # purged from queue
724 self.__datamodel.errorqueue.remove(
725 eventNumber, ignoreMissingEventNumber=True
726 )
727 while self.__isPaused and not self.__isStopped:
728 sleep(1) # Allow loop to be paused
729 if self.__isStopped:
730 break # Allow loop to be interrupted if requested
731 else:
732 # Update __errorQueue_lastretry only if loop hasn't been interrupted
733 self.__errorQueue_lastretry = now
735 done = previousKeys == self.__datamodel.errorqueue.keys() or not retryQueue
736 if done:
737 if previousKeys:
738 __hermes__.logger.debug(
739 f"End of retryerrorqueue {previousKeys=}"
740 f" {self.__datamodel.errorqueue.keys()=} - {retryQueue=}"
741 )
742 else:
743 __hermes__.logger.debug(
744 "As some event have been processed, will retry ignored events"
745 f" {retryQueue}"
746 )
747 evNumbersToRetry = retryQueue.copy()
749 self.__isAnErrorRetry = False # End of __retryErrorQueue()
751 def __emptyTrashBin(self, force: bool = False):
752 # Enforce purgeInterval
753 now = datetime.now()
754 if (
755 not force
756 and now < self.__trashbin_lastpurge + self.__trashbin_purgeInterval
757 ):
758 return # Too early to process again
759 if self.__trashbin_retention is not None:
760 retentionLimit = datetime.now() - self.__trashbin_retention
762 objtype: str
763 objs: DataObjectList
764 # As we'll remove objects, process data in the datamodel reversed declaration
765 # order
766 for objtype, objs in reversed(self.__datamodel.remotedata.items()):
767 if not objtype.startswith("trashbin_"):
768 continue
770 for pkey in objs.getPKeys():
771 obj = objs.get(pkey)
772 if (
773 self.__trashbin_retention is None
774 or obj._trashbin_timestamp < retentionLimit
775 ):
776 event = Event(
777 evcategory="base", eventtype="removed", obj=obj, objattrs={}
778 )
779 __hermes__.logger.info(f"Trying to purge {repr(obj)} from trashbin")
780 if self.__datamodel.errorqueue.containsObjectByEvent(
781 event, isLocalEvent=False
782 ) and not self.__datamodel.errorqueue.isEventAParentOfAnotherError(
783 event, isLocalEvent=False
784 ):
785 try:
786 self.__processRemoteEvent(
787 event, local_event=None, enqueueEventWithError=False
788 )
789 except HermesClientHandlerError:
790 pass
791 else:
792 self.__processRemoteEvent(
793 event, local_event=None, enqueueEventWithError=True
794 )
796 while self.__isPaused and not self.__isStopped:
797 sleep(1) # Allow loop to be paused
798 if self.__isStopped:
799 break # Allow loop to be interrupted if requested
801 while self.__isPaused and not self.__isStopped:
802 sleep(1) # Allow loop to be paused
803 if self.__isStopped:
804 break # Allow loop to be interrupted if requested
805 else:
806 # Update __trashbin_lastpurge only if loop hasn't been interrupted
807 self.__trashbin_lastpurge = now
809 def __hasAlreadyBeenInitialized(self) -> bool:
810 if (
811 self.__cache.initstartoffset is None
812 or self.__cache.initstopoffset is None
813 or self.__cache.nextoffset is None
814 or self.__cache.nextoffset < self.__cache.initstopoffset
815 ):
816 return False
817 return True
819 def __hasAtLeastBeganInitialization(self) -> bool:
820 return (
821 self.__cache.initstartoffset is not None
822 and self.__datamodel.hasRemoteSchema()
823 )
825 def __canBeInitialized(self) -> bool:
826 self.__msgbus.seekToBeginning()
828 # List of (start, stop) offsets of initsync sequences found
829 initSyncFound: list[tuple[Any, Any]] = []
831 start = None
832 stop = None
833 event: Event
834 for event in self.__msgbus:
835 if event.evcategory != "initsync":
836 continue
837 if event.eventtype == "init-start":
838 start = event.offset
839 elif event.eventtype == "init-stop" and start is not None:
840 stop = event.offset
841 initSyncFound.append(
842 (start, stop),
843 )
844 if self.__useFirstInitsyncSequence:
845 break # We found the first sequence
846 else:
847 # Continue to find a new complete sequence
848 start = None
849 stop = None
851 if not initSyncFound:
852 return False
854 if self.__useFirstInitsyncSequence:
855 start, stop = initSyncFound[0]
856 else:
857 start, stop = initSyncFound[-1]
859 if self.__cache.nextoffset is None or self.__cache.nextoffset < start:
860 self.__cache.nextoffset = start
862 self.__cache.initstartoffset = start
863 self.__cache.initstopoffset = stop
864 __hermes__.logger.debug(
865 "Init sequence was found in Kafka at offsets"
866 f" [{self.__cache.initstartoffset} ; {self.__cache.initstopoffset}]"
867 )
868 return True
870 def __updateSchema(self, newSchema: Dataschema):
871 if self.__datamodel.forcePurgeOfTrashedObjectsWithoutNewPkeys(
872 self.__datamodel.remote_schema, newSchema
873 ):
874 self.__emptyTrashBin(force=True)
876 self.__datamodel.updateSchema(newSchema)
877 self.__config.savecachefile() # Save config to be able to rebuild datamodel
878 # Save and reload error queue to purge it from events of any suppressed types
879 self.__datamodel.saveErrorQueue()
880 self.__datamodel.loadErrorQueue()
881 self.__checkDatamodelWarnings()
883 def __checkDatamodelWarnings(self):
884 if self.__datamodel.unknownRemoteTypes:
885 __hermes__.logger.warning(
886 "Datamodel errors: remote types"
887 f" '{self.__datamodel.unknownRemoteTypes}'"
888 " don't exist in current Dataschema"
889 )
891 if self.__datamodel.unknownRemoteAttributes:
892 __hermes__.logger.warning(
893 "Datamodel errors: remote attributes don't exist in current"
894 f" Dataschema: {self.__datamodel.unknownRemoteAttributes}"
895 )
896 self.__notifyDatamodelWarnings()
898 def __processEvents(self, isInitSync=False):
899 remote_event: Event
900 schema: Dataschema | None = None
901 evcategory: str = "initsync" if isInitSync else "base"
903 self.__msgbus.seek(self.__cache.nextoffset)
905 for remote_event in self.__msgbus:
906 self.__saveRequired = True
907 if isInitSync and remote_event.offset > self.__cache.initstopoffset:
908 # Should never be called
909 self.__cache.nextoffset = remote_event.offset + 1
910 break
912 # TODO: implement data consistency check if event.evcategory==initsync
913 # and evcategory==base
915 if remote_event.evcategory != evcategory:
916 self.__cache.nextoffset = remote_event.offset + 1
917 continue
919 if isInitSync:
920 if remote_event.eventtype == "init-start":
921 schema = Dataschema.from_json(remote_event.objattrs)
922 self.__updateSchema(schema)
923 continue
925 if remote_event.eventtype == "init-stop":
926 self.__cache.nextoffset = remote_event.offset + 1
927 break
929 if schema is None and not self.__hasAtLeastBeganInitialization():
930 msg = "Invalid initsync sequence met, ignoring"
931 __hermes__.logger.critical(msg)
932 return
934 # Process "standard" message
935 match remote_event.eventtype:
936 case "added" | "modified" | "removed":
937 self.__processRemoteEvent(
938 remote_event, local_event=None, enqueueEventWithError=True
939 )
940 case "dataschema":
941 schema = Dataschema.from_json(remote_event.objattrs)
942 self.__updateSchema(schema)
943 case _:
944 __hermes__.logger.error(
945 "Received an event with unknown type"
946 f" '{remote_event.eventtype}': ignored"
947 )
949 self.__cache.nextoffset = remote_event.offset + 1
951 while self.__isPaused and not self.__isStopped:
952 sleep(1) # Allow loop to be paused
953 if self.__isStopped:
954 break # Allow loop to be interrupted if requested
956 def __processRemoteEvent(
957 self,
958 remote_event: Event | None,
959 local_event: Event | None,
960 enqueueEventWithError: bool,
961 simulateOnly: bool = False,
962 ):
963 secretAttrs = self.__datamodel.remote_schema.secretsAttributesOf(
964 remote_event.objtype
965 )
966 __hermes__.logger.debug(
967 f"__processRemoteEvent({remote_event.toString(secretAttrs)})"
968 )
969 self.__saveRequired = True
971 # In case of modification, try to compose full object in order to provide all
972 # attributes values, in order to render Jinja Template with several vars.
973 # In this specific case, one of the template var may have been modified,
974 # but not the other, so the event attrs are not enough to process the
975 # template rendering
976 r_obj_complete: DataObject | None = None
977 if remote_event.eventtype == "modified":
978 cache_complete, r_cachedobj_complete = (
979 Datamodel.getObjectFromCacheOrTrashbin(
980 self.__datamodel.remotedata_complete,
981 remote_event.objtype,
982 remote_event.objpkey,
983 )
984 )
985 if r_cachedobj_complete is not None:
986 r_obj_complete = Datamodel.getUpdatedObject(
987 r_cachedobj_complete, remote_event.objattrs
988 )
990 # Should be always None, except when called from __retryErrorQueue()
991 # In this case, we have to use the provided local_event, as it may contains
992 # some extra changes stacked by autoremediation
993 if local_event is None:
994 local_event = self.__datamodel.convertEventToLocal(
995 remote_event, r_obj_complete
996 )
997 trashbin = self.__datamodel.remotedata[f"trashbin_{remote_event.objtype}"]
999 if not simulateOnly and enqueueEventWithError:
1000 hadErrors = self.__datamodel.errorqueue.containsObjectByEvent(
1001 remote_event, isLocalEvent=False
1002 )
1003 isParent = self.__datamodel.errorqueue.isEventAParentOfAnotherError(
1004 remote_event, isLocalEvent=False
1005 )
1006 if hadErrors or (
1007 isParent and remote_event.eventtype in self.__foreignkeys_events
1008 ):
1009 secretAttrs = self.__datamodel.remote_schema.secretsAttributesOf(
1010 remote_event.objtype
1011 )
1012 if hadErrors:
1013 errorMsg = (
1014 f"Object in remote event {remote_event.toString(secretAttrs)}"
1015 " already had unresolved errors: appending event to error queue"
1016 )
1017 else:
1018 errorMsg = (
1019 f"Object in remote event {remote_event.toString(secretAttrs)}"
1020 " is a dependency of an object that already had unresolved"
1021 " errors: appending event to error queue"
1022 )
1023 __hermes__.logger.warning(errorMsg)
1024 self.__processRemoteEvent(
1025 remote_event,
1026 local_event=None,
1027 enqueueEventWithError=False,
1028 simulateOnly=True,
1029 )
1030 if local_event is None:
1031 # Force empty event generation when local_event doesn't change
1032 # anything
1033 local_event = self.__datamodel.convertEventToLocal(
1034 remote_event, r_obj_complete, allowEmptyEvent=True
1035 )
1036 self.__datamodel.errorqueue.append(remote_event, local_event, errorMsg)
1037 return
1039 try:
1040 match remote_event.eventtype:
1041 case "added":
1042 if (
1043 self.__trashbin_retention is not None
1044 and remote_event.objpkey in trashbin
1045 ):
1046 # Object is in trashbin, recycle it
1047 self.__remoteRecycled(remote_event, local_event, simulateOnly)
1048 else:
1049 # Add new object
1050 self.__remoteAdded(remote_event, local_event, simulateOnly)
1052 case "modified":
1053 self.__remoteModified(remote_event, local_event, simulateOnly)
1055 case "removed":
1056 # Remove object on any of these conditions:
1057 # - trashbin retention is disabled
1058 # - object is already in trashbin
1059 if (
1060 self.__trashbin_retention is None
1061 or remote_event.objpkey in trashbin
1062 ):
1063 # Remove object
1064 self.__remoteRemoved(remote_event, local_event, simulateOnly)
1065 else:
1066 # Store object in trashbin
1067 self.__remoteTrashed(remote_event, local_event, simulateOnly)
1068 except HermesClientHandlerError as e:
1069 if not simulateOnly and enqueueEventWithError:
1070 self.__processRemoteEvent(
1071 remote_event,
1072 local_event=None,
1073 enqueueEventWithError=False,
1074 simulateOnly=True,
1075 )
1076 remote_event.step = self.currentStep
1077 remote_event.isPartiallyProcessed = self.isPartiallyProcessed
1078 local_event.step = self.currentStep
1079 local_event.isPartiallyProcessed = self.isPartiallyProcessed
1081 if local_event is None:
1082 # Force empty event generation when local_event doesn't change
1083 # anything
1084 local_event = self.__datamodel.convertEventToLocal(
1085 remote_event, r_obj_complete, allowEmptyEvent=True
1086 )
1087 self.__datamodel.errorqueue.append(remote_event, local_event, e.msg)
1088 else:
1089 raise
1091 def __processLocalEvent(
1092 self,
1093 remote_event: Event | None,
1094 local_event: Event | None,
1095 enqueueEventWithError: bool,
1096 simulateOnly: bool = False,
1097 ):
1098 if local_event is None:
1099 __hermes__.logger.debug("__processLocalEvent(None)")
1100 return
1102 secretAttrs = self.__datamodel.local_schema.secretsAttributesOf(
1103 local_event.objtype
1104 )
1105 __hermes__.logger.debug(
1106 f"__processLocalEvent({local_event.toString(secretAttrs)})"
1107 )
1109 self.__saveRequired = True
1111 if not simulateOnly:
1112 # Reset current step
1113 self.currentStep = local_event.step
1114 self.isPartiallyProcessed = local_event.isPartiallyProcessed
1116 if not simulateOnly and enqueueEventWithError:
1117 hadErrors = self.__datamodel.errorqueue.containsObjectByEvent(
1118 local_event, isLocalEvent=True
1119 )
1120 isParent = self.__datamodel.errorqueue.isEventAParentOfAnotherError(
1121 local_event, isLocalEvent=True
1122 )
1123 if hadErrors or (
1124 isParent and local_event.eventtype in self.__foreignkeys_events
1125 ):
1126 secretAttrs = self.__datamodel.local_schema.secretsAttributesOf(
1127 local_event.objtype
1128 )
1129 if hadErrors:
1130 errorMsg = (
1131 f"Object in local event {local_event.toString(secretAttrs)}"
1132 " already had unresolved errors: appending event to error queue"
1133 )
1134 else:
1135 errorMsg = (
1136 f"Object in local event {local_event.toString(secretAttrs)}"
1137 " is a dependency of an object that already had unresolved"
1138 " errors: appending event to error queue"
1139 )
1140 __hermes__.logger.warning(errorMsg)
1141 self.__processLocalEvent(
1142 None, local_event, enqueueEventWithError=False, simulateOnly=True
1143 )
1144 self.__datamodel.errorqueue.append(remote_event, local_event, errorMsg)
1145 return
1147 trashbin = self.__datamodel.localdata[f"trashbin_{local_event.objtype}"]
1148 try:
1149 match local_event.eventtype:
1150 case "added":
1151 if (
1152 self.__trashbin_retention is not None
1153 and local_event.objpkey in trashbin
1154 ):
1155 # Object is in trashbin, recycle it
1156 self.__localRecycled(local_event, simulateOnly)
1157 else:
1158 self.__localAdded(local_event, simulateOnly)
1159 case "modified":
1160 if not simulateOnly and local_event.objpkey in trashbin:
1161 # Object is in trashbin, and cannot be modified until it is
1162 # restored
1163 if enqueueEventWithError:
1164 # As the object changes will be processed at restore,
1165 # ignore the change
1166 self.__processLocalEvent(
1167 None,
1168 local_event,
1169 enqueueEventWithError=False,
1170 simulateOnly=True,
1171 )
1172 else:
1173 # Propagate error as requested
1174 raise HermesClientHandlerError(
1175 f"Object of event {repr(local_event)} is in trashbin,"
1176 " and cannot be modified until it is restored"
1177 )
1178 else:
1179 self.__localModified(local_event, simulateOnly)
1180 case "removed":
1181 # Remove object on any of these conditions:
1182 # - trashbin retention is disabled
1183 # - object is already in trashbin
1184 if (
1185 self.__trashbin_retention is None
1186 or local_event.objpkey in trashbin
1187 ):
1188 self.__localRemoved(local_event, simulateOnly)
1189 else:
1190 self.__localTrashed(local_event, simulateOnly)
1191 except HermesClientHandlerError as e:
1192 if not simulateOnly and enqueueEventWithError:
1193 self.__processLocalEvent(
1194 None, local_event, enqueueEventWithError=False, simulateOnly=True
1195 )
1196 if remote_event is not None:
1197 remote_event.step = self.currentStep
1198 remote_event.isPartiallyProcessed = self.isPartiallyProcessed
1199 local_event.step = self.currentStep
1200 local_event.isPartiallyProcessed = self.isPartiallyProcessed
1201 self.__datamodel.errorqueue.append(remote_event, local_event, e.msg)
1202 else:
1203 raise
1205 def __remoteAdded(
1206 self, remote_event: Event, local_event: Event, simulateOnly: bool = False
1207 ):
1208 secretAttrs = self.__datamodel.remote_schema.secretsAttributesOf(
1209 remote_event.objtype
1210 )
1211 __hermes__.logger.debug(f"__remoteAdded({remote_event.toString(secretAttrs)})")
1213 r_obj = self.__datamodel.createRemoteDataobject(
1214 remote_event.objtype, remote_event.objattrs
1215 )
1216 self.__processLocalEvent(
1217 remote_event,
1218 local_event,
1219 enqueueEventWithError=False,
1220 simulateOnly=simulateOnly,
1221 )
1223 # Add remote object to cache
1224 if not simulateOnly:
1225 self.__datamodel.remotedata[remote_event.objtype].append(r_obj)
1226 # May already been added if current event is from errorqueue
1227 if r_obj not in self.__datamodel.remotedata_complete[remote_event.objtype]:
1228 self.__datamodel.remotedata_complete[remote_event.objtype].append(r_obj)
1230 def __localAdded(self, local_ev: Event, simulateOnly: bool = False):
1231 secretAttrs = self.__datamodel.local_schema.secretsAttributesOf(
1232 local_ev.objtype
1233 )
1234 __hermes__.logger.debug(f"__localAdded({local_ev.toString(secretAttrs)})")
1236 l_obj = self.__datamodel.createLocalDataobject(
1237 local_ev.objtype, local_ev.objattrs
1238 )
1240 if not simulateOnly:
1241 # Call added handler
1242 self.__callHandler(
1243 objtype=local_ev.objtype,
1244 eventtype="added",
1245 objkey=local_ev.objpkey,
1246 eventattrs=local_ev.objattrs,
1247 newobj=deepcopy(l_obj),
1248 )
1250 # Add local object to cache
1251 if not simulateOnly:
1252 self.__datamodel.localdata[local_ev.objtype].append(l_obj)
1253 # May already been added if current event is from errorqueue
1254 if l_obj not in self.__datamodel.localdata_complete[local_ev.objtype]:
1255 self.__datamodel.localdata_complete[local_ev.objtype].append(l_obj)
1257 def __remoteRecycled(
1258 self, remote_event: Event, local_event: Event, simulateOnly: bool = False
1259 ):
1260 secretAttrs = self.__datamodel.remote_schema.secretsAttributesOf(
1261 remote_event.objtype
1262 )
1263 __hermes__.logger.debug(
1264 f"__remoteRecycled({remote_event.toString(secretAttrs)})"
1265 )
1266 maincache = self.__datamodel.remotedata[remote_event.objtype]
1267 maincache_complete = self.__datamodel.remotedata_complete[remote_event.objtype]
1268 trashbin = self.__datamodel.remotedata[f"trashbin_{remote_event.objtype}"]
1269 trashbin_complete = self.__datamodel.remotedata_complete[
1270 f"trashbin_{remote_event.objtype}"
1271 ]
1273 r_obj = self.__datamodel.createRemoteDataobject(
1274 remote_event.objtype, remote_event.objattrs
1275 )
1277 self.__processLocalEvent(
1278 remote_event,
1279 local_event,
1280 enqueueEventWithError=False,
1281 simulateOnly=simulateOnly,
1282 )
1284 # Remove remote object from trashbin
1285 if not simulateOnly:
1286 trashbin.removeByPkey(remote_event.objpkey)
1287 trashbin_complete.removeByPkey(remote_event.objpkey)
1288 # Restore remote object, with its potential changes, in main cache
1289 if not simulateOnly:
1290 maincache.append(r_obj)
1291 # May already been recycled if current event is from errorqueue
1292 if r_obj not in maincache_complete:
1293 maincache_complete.append(r_obj)
1295 def __localRecycled(self, local_ev: Event, simulateOnly: bool = False):
1296 secretAttrs = self.__datamodel.local_schema.secretsAttributesOf(
1297 local_ev.objtype
1298 )
1299 __hermes__.logger.debug(f"__localRecycled({local_ev.toString(secretAttrs)})")
1300 maincache = self.__datamodel.localdata[local_ev.objtype]
1301 maincache_complete = self.__datamodel.localdata_complete[local_ev.objtype]
1302 trashbin = self.__datamodel.localdata[f"trashbin_{local_ev.objtype}"]
1303 trashbin_complete = self.__datamodel.localdata_complete[
1304 f"trashbin_{local_ev.objtype}"
1305 ]
1307 l_obj = self.__datamodel.createLocalDataobject(
1308 local_ev.objtype, local_ev.objattrs
1309 )
1310 l_obj_trash: DataObject = deepcopy(trashbin.get(local_ev.objpkey))
1311 del l_obj_trash._trashbin_timestamp # Remove trashbin timestamp from object
1313 l_obj_trash_complete: DataObject = deepcopy(
1314 trashbin_complete.get(local_ev.objpkey)
1315 )
1317 # May already been recycled if current event is from errorqueue
1318 if l_obj_trash_complete is not None:
1319 del (
1320 l_obj_trash_complete._trashbin_timestamp
1321 ) # Remove trashbin timestamp from object
1323 if not simulateOnly:
1324 # Call recycled handler
1325 self.__callHandler(
1326 objtype=local_ev.objtype,
1327 eventtype="recycled",
1328 objkey=l_obj_trash.getPKey(),
1329 eventattrs=l_obj_trash.toNative(),
1330 newobj=deepcopy(l_obj_trash),
1331 )
1333 if not simulateOnly:
1334 # Remove local object from trashbin
1335 trashbin.remove(l_obj_trash)
1336 # Restore local object in main cache
1337 maincache.append(l_obj_trash)
1339 # May already been recycled if current event is from errorqueue
1340 if l_obj_trash_complete is not None:
1341 trashbin_complete.remove(l_obj_trash_complete)
1342 maincache_complete.append(l_obj_trash_complete)
1344 diff = l_obj.diffFrom(l_obj_trash) # Handle local object changes if any
1345 if diff and not simulateOnly:
1346 (event, obj) = Event.fromDiffItem(
1347 diffitem=diff,
1348 eventCategory=local_ev.evcategory,
1349 changeType="modified",
1350 )
1351 # Hack: we pass this second event (modified) to error queue in order to
1352 # postpone its processing once all caches of previous one are up to date.
1353 # Otherwise, if an error is met on this second event (modified), we'll try
1354 # to reprocess the first one (recycled)
1355 self.__datamodel.errorqueue.append(
1356 remoteEvent=None, localEvent=event, errorMsg=None
1357 )
1358 # ... and force error queue to be retried asap in order to process the
1359 # pending event
1360 self.__errorQueue_lastretry = datetime(year=1, month=1, day=1)
1362 def __remoteModified(
1363 self, remote_event: Event, local_event: Event, simulateOnly: bool = False
1364 ):
1365 secretAttrs = self.__datamodel.remote_schema.secretsAttributesOf(
1366 remote_event.objtype
1367 )
1368 __hermes__.logger.debug(
1369 f"__remoteModified({remote_event.toString(secretAttrs)})"
1370 )
1371 maincache = self.__datamodel.remotedata[remote_event.objtype]
1373 if not simulateOnly:
1374 r_cachedobj: DataObject = maincache.get(remote_event.objpkey)
1375 r_obj = Datamodel.getUpdatedObject(r_cachedobj, remote_event.objattrs)
1377 cache_complete, r_cachedobj_complete = Datamodel.getObjectFromCacheOrTrashbin(
1378 self.__datamodel.remotedata_complete,
1379 remote_event.objtype,
1380 remote_event.objpkey,
1381 )
1382 r_obj_complete = Datamodel.getUpdatedObject(
1383 r_cachedobj_complete, remote_event.objattrs
1384 )
1386 self.__processLocalEvent(
1387 remote_event,
1388 local_event,
1389 enqueueEventWithError=False,
1390 simulateOnly=simulateOnly,
1391 )
1393 # Update remote object in cache
1394 if not simulateOnly:
1395 maincache.replace(r_obj)
1397 # May not exist
1398 if cache_complete is not None:
1399 cache_complete.replace(r_obj_complete)
1401 def __localModified(self, local_ev: Event, simulateOnly: bool = False):
1402 secretAttrs = self.__datamodel.local_schema.secretsAttributesOf(
1403 local_ev.objtype
1404 )
1405 __hermes__.logger.debug(f"__localModified({local_ev.toString(secretAttrs)})")
1406 maincache = self.__datamodel.localdata[local_ev.objtype]
1408 if not simulateOnly:
1409 l_cachedobj: DataObject = maincache.get(local_ev.objpkey)
1410 l_obj = Datamodel.getUpdatedObject(l_cachedobj, local_ev.objattrs)
1412 cache_complete, l_cachedobj_complete = Datamodel.getObjectFromCacheOrTrashbin(
1413 self.__datamodel.localdata_complete, local_ev.objtype, local_ev.objpkey
1414 )
1416 # May not exist
1417 if cache_complete is not None:
1418 l_obj_complete = Datamodel.getUpdatedObject(
1419 l_cachedobj_complete, local_ev.objattrs
1420 )
1422 if not simulateOnly:
1423 # Call modified handler
1424 self.__callHandler(
1425 objtype=local_ev.objtype,
1426 eventtype="modified",
1427 objkey=local_ev.objpkey,
1428 eventattrs=local_ev.objattrs,
1429 newobj=deepcopy(l_obj),
1430 cachedobj=deepcopy(l_cachedobj),
1431 )
1433 # Update local object in cache
1434 if not simulateOnly:
1435 maincache.replace(l_obj)
1437 # May not exist
1438 if cache_complete is not None:
1439 cache_complete.replace(l_obj_complete)
1441 def __remoteTrashed(
1442 self, remote_event: Event, local_event: Event, simulateOnly: bool = False
1443 ):
1444 secretAttrs = self.__datamodel.remote_schema.secretsAttributesOf(
1445 remote_event.objtype
1446 )
1447 __hermes__.logger.debug(
1448 f"__remoteTrashed({remote_event.toString(secretAttrs)})"
1449 )
1450 maincache = self.__datamodel.remotedata[remote_event.objtype]
1451 maincache_complete = self.__datamodel.remotedata_complete[remote_event.objtype]
1452 trashbin = self.__datamodel.remotedata[f"trashbin_{remote_event.objtype}"]
1453 trashbin_complete = self.__datamodel.remotedata_complete[
1454 f"trashbin_{remote_event.objtype}"
1455 ]
1457 r_cachedobj: DataObject = maincache.get(remote_event.objpkey)
1458 r_cachedobj_complete: DataObject = maincache_complete.get(remote_event.objpkey)
1460 self.__processLocalEvent(
1461 remote_event,
1462 local_event,
1463 enqueueEventWithError=False,
1464 simulateOnly=simulateOnly,
1465 )
1467 if not simulateOnly:
1468 # Remove remote object from cache
1469 maincache.remove(r_cachedobj)
1470 r_cachedobj._trashbin_timestamp = remote_event.timestamp
1471 # Add remote object to trashbin
1472 trashbin.append(r_cachedobj)
1474 if r_cachedobj_complete is not None:
1475 maincache_complete.remove(r_cachedobj_complete)
1476 r_cachedobj_complete._trashbin_timestamp = remote_event.timestamp
1477 trashbin_complete.append(r_cachedobj_complete)
1479 def __localTrashed(self, local_ev: Event, simulateOnly: bool = False):
1480 secretAttrs = self.__datamodel.local_schema.secretsAttributesOf(
1481 local_ev.objtype
1482 )
1483 __hermes__.logger.debug(f"__localTrashed({local_ev.toString(secretAttrs)})")
1484 maincache = self.__datamodel.localdata[local_ev.objtype]
1485 maincache_complete = self.__datamodel.localdata_complete[local_ev.objtype]
1486 trashbin = self.__datamodel.localdata[f"trashbin_{local_ev.objtype}"]
1487 trashbin_complete = self.__datamodel.localdata_complete[
1488 f"trashbin_{local_ev.objtype}"
1489 ]
1491 l_cachedobj: DataObject = maincache.get(local_ev.objpkey)
1492 l_cachedobj_complete: DataObject = maincache_complete.get(local_ev.objpkey)
1494 if not simulateOnly:
1495 # Call trashed handler
1496 self.__callHandler(
1497 objtype=local_ev.objtype,
1498 eventtype="trashed",
1499 objkey=local_ev.objpkey,
1500 eventattrs=local_ev.objattrs,
1501 cachedobj=deepcopy(l_cachedobj),
1502 )
1504 if not simulateOnly:
1505 # Remove local object from cache
1506 maincache.remove(l_cachedobj)
1507 l_cachedobj._trashbin_timestamp = local_ev.timestamp
1508 # Add local object to trashbin
1509 trashbin.append(l_cachedobj)
1511 if l_cachedobj_complete is not None:
1512 maincache_complete.remove(l_cachedobj_complete)
1513 l_cachedobj_complete._trashbin_timestamp = local_ev.timestamp
1514 trashbin_complete.append(l_cachedobj_complete)
1516 def __remoteRemoved(
1517 self, remote_event: Event, local_event: Event, simulateOnly: bool = False
1518 ):
1519 secretAttrs = self.__datamodel.remote_schema.secretsAttributesOf(
1520 remote_event.objtype
1521 )
1522 __hermes__.logger.debug(
1523 f"__remoteRemoved({remote_event.toString(secretAttrs)})"
1524 )
1526 cache, r_cachedobj = Datamodel.getObjectFromCacheOrTrashbin(
1527 self.__datamodel.remotedata,
1528 remote_event.objtype,
1529 remote_event.objpkey,
1530 )
1532 cache_complete, r_cachedobj_complete = Datamodel.getObjectFromCacheOrTrashbin(
1533 self.__datamodel.remotedata_complete,
1534 remote_event.objtype,
1535 remote_event.objpkey,
1536 )
1538 self.__processLocalEvent(
1539 remote_event,
1540 local_event,
1541 enqueueEventWithError=False,
1542 simulateOnly=simulateOnly,
1543 )
1545 # Remove remote object from cache or trashbin
1546 if not simulateOnly:
1547 cache.remove(r_cachedobj)
1549 # May already been removed if current event is from errorqueue
1550 if cache_complete is not None:
1551 cache_complete.remove(r_cachedobj_complete)
1553 if not simulateOnly:
1554 # Remove eventual events relative to current object from error queue
1555 self.__datamodel.errorqueue.purgeAllEventsOfDataObject(
1556 r_cachedobj, isLocalObjtype=False
1557 )
1559 def __localRemoved(self, local_ev: Event, simulateOnly: bool = False):
1560 secretAttrs = self.__datamodel.local_schema.secretsAttributesOf(
1561 local_ev.objtype
1562 )
1563 __hermes__.logger.debug(f"__localRemoved({local_ev.toString(secretAttrs)})")
1565 cache, l_cachedobj = Datamodel.getObjectFromCacheOrTrashbin(
1566 self.__datamodel.localdata,
1567 local_ev.objtype,
1568 local_ev.objpkey,
1569 )
1571 cache_complete, l_cachedobj_complete = Datamodel.getObjectFromCacheOrTrashbin(
1572 self.__datamodel.localdata_complete,
1573 local_ev.objtype,
1574 local_ev.objpkey,
1575 )
1577 if not simulateOnly:
1578 # Call removed handler
1579 self.__callHandler(
1580 objtype=local_ev.objtype,
1581 eventtype="removed",
1582 objkey=local_ev.objpkey,
1583 eventattrs=local_ev.objattrs,
1584 cachedobj=deepcopy(l_cachedobj),
1585 )
1587 # Remove local object from cache or trashbin
1588 if not simulateOnly:
1589 cache.remove(l_cachedobj)
1590 # May already been removed if current event is from errorqueue
1591 if cache_complete is not None:
1592 cache_complete.remove(l_cachedobj_complete)
1594 if not simulateOnly:
1595 # Remove eventual events relative to current object from error queue
1596 self.__datamodel.errorqueue.purgeAllEventsOfDataObject(
1597 l_cachedobj, isLocalObjtype=True
1598 )
1600 def __callHandler(self, objtype: str, eventtype: str, **kwargs):
1601 if not objtype:
1602 handlerName = f"on_{eventtype}"
1603 kwargs_filtered = kwargs
1604 else:
1605 handlerName = f"on_{objtype}_{eventtype}"
1607 # Filter secrets values
1608 secretAttrs = self.__datamodel.local_schema.secretsAttributesOf(objtype)
1609 kwargs_filtered = kwargs.copy()
1610 kwargs_filtered["eventattrs"] = Event.objattrsToString(
1611 kwargs["eventattrs"], secretAttrs
1612 )
1614 kwargsstr = ", ".join([f"{k}={repr(v)}" for k, v in kwargs_filtered.items()])
1616 hdlr = getattr(self, handlerName, None)
1618 if not callable(hdlr):
1619 __hermes__.logger.debug(
1620 f"Calling '{handlerName}({kwargsstr})': handler '{handlerName}()'"
1621 " doesn't exists"
1622 )
1623 return
1625 __hermes__.logger.info(
1626 f"Calling '{handlerName}({kwargsstr})' - currentStep={self.currentStep},"
1627 f" isPartiallyProcessed={self.isPartiallyProcessed},"
1628 f" isAnErrorRetry={self.isAnErrorRetry}"
1629 )
1631 try:
1632 hdlr(**kwargs)
1633 except Exception as e:
1634 __hermes__.logger.error(
1635 f"Calling '{handlerName}({kwargsstr})': error met on step"
1636 f" {self.currentStep} '{str(e)}'"
1637 )
1638 raise HermesClientHandlerError(e)
1640 def __processDatamodelUpdate(self):
1641 """Check difference between current datamodel and previous one. If datamodel has
1642 changed, generate local events according to datamodel changes"""
1644 diff = self.__newdatamodel.diffFrom(self.__datamodel)
1646 if not diff:
1647 __hermes__.logger.info("No change in datamodel")
1648 # Start working with new datamodel
1649 self.__datamodel = self.__newdatamodel
1650 self.__datamodel.loadErrorQueue()
1651 return
1653 __hermes__.logger.info(f"Datamodel has changed: {diff.dict}")
1654 self.__saveRequired = True
1656 # Start by removed types, as it requires the previous datamodel to process data
1657 # removal
1658 if diff.removed:
1659 for l_objtype in diff.removed:
1660 __hermes__.logger.info(f"About to purge data from type '{l_objtype}'")
1661 if l_objtype not in self.__datamodel.typesmapping.values():
1662 __hermes__.logger.warning(
1663 f"Requested to purge data from type '{l_objtype}', but it"
1664 " doesn't exist in previous datamodel: ignoring"
1665 )
1666 continue
1668 pkeys = (
1669 self.__datamodel.localdata[l_objtype].getPKeys()
1670 | self.__datamodel.localdata[f"trashbin_{l_objtype}"].getPKeys()
1671 )
1673 # Call remove on each object
1674 for pkey in pkeys:
1675 _, l_obj = Datamodel.getObjectFromCacheOrTrashbin(
1676 self.__datamodel.localdata, l_objtype, pkey
1677 )
1678 if l_obj:
1679 l_ev = Event(
1680 evcategory="base",
1681 eventtype="removed",
1682 obj=l_obj,
1683 objattrs={},
1684 )
1685 secretAttrs = self.__datamodel.local_schema.secretsAttributesOf(
1686 l_objtype
1687 )
1688 __hermes__.logger.debug(
1689 f"Removing local object of {pkey=}:"
1690 f" {l_ev.toString(secretAttrs)=}"
1691 )
1692 self.__localRemoved(l_ev)
1693 else:
1694 __hermes__.logger.error(f"Local object of {pkey=} not found")
1696 # All objects have been removed, remove remaining events from
1697 # errorqueue, if any
1698 for l_obj in self.__datamodel.localdata_complete[l_objtype]:
1699 # Remove eventual events relative to current object from error queue
1700 self.__datamodel.errorqueue.purgeAllEventsOfDataObject(
1701 l_obj, isLocalObjtype=True
1702 )
1704 self.__datamodel.saveLocalAndRemoteData() # Save changes
1706 # Purge old remote and local cache files, if any
1707 __hermes__.logger.info(
1708 f"Types removed from Datamodel: {diff.removed}, purging cache files"
1709 )
1710 Datamodel.purgeOldCacheFiles(diff.removed, cacheFilePrefix="__")
1712 # Start working with new datamodel
1713 self.__datamodel = self.__newdatamodel
1714 self.__datamodel.loadLocalAndRemoteData()
1716 # Reload error queue to allow it to handle new datamodel
1717 self.__datamodel.saveErrorQueue()
1718 self.__datamodel.loadErrorQueue()
1720 if diff.added or diff.modified:
1721 # Generate diff events according to datamodel changes
1722 new_local_data: dict[str, DataObjectList] = {}
1724 # Work on "complete" copy of data cache, representing the cache that should
1725 # be without any event in error queue in order to compute diff on complete
1726 # cache representation
1727 completeRemoteData = self.__datamodel.remotedata_complete
1728 completeLocalData = self.__datamodel.localdata_complete
1730 # Loop over each remote type
1731 for r_objtype in self.__datamodel.remote_schema.objectTypes:
1732 # For each type, we'll work on classic data, and on trashbin data
1733 for prefix in ("", "trashbin_"):
1734 if r_objtype not in self.__datamodel.typesmapping:
1735 continue # Remote objtype isn't set in current Datamodel
1737 # Fetch corresponding local type
1738 l_objtype = f"{prefix}{self.__datamodel.typesmapping[r_objtype]}"
1740 # Convert remote data cache to local data
1741 new_local_data[l_objtype] = (
1742 self.__datamodel.convertDataObjectListToLocal(
1743 r_objtype, completeRemoteData[f"{prefix}{r_objtype}"]
1744 )
1745 )
1747 # Compute differences between new local data and local data cache
1748 completeLocalDataObjtype: DataObjectList = completeLocalData.get(
1749 l_objtype, DataObjectList([])
1750 )
1751 datadiff = new_local_data[l_objtype].diffFrom(
1752 completeLocalDataObjtype
1753 )
1755 for changeType, difflist in datadiff.dict.items():
1756 diffitem: DiffObject | DataObject
1757 for diffitem in difflist:
1758 # Convert diffitem to local Event
1759 (event, obj) = Event.fromDiffItem(
1760 diffitem=diffitem,
1761 eventCategory="base",
1762 changeType=changeType,
1763 )
1765 if prefix == "trashbin_":
1766 if obj not in completeLocalDataObjtype:
1767 # Object exists in remote trashbin, but not in
1768 # local one as it has been removed before its type
1769 # was added to client's Datamodel.
1770 # Process a local "added" event, then a local
1771 # "removed" event to store local object in trashbin
1773 # Add local object
1774 self.__processLocalEvent(
1775 None, event, enqueueEventWithError=True
1776 )
1778 # Prepare "removed" event
1779 event = Event(
1780 evcategory="base",
1781 eventtype="removed",
1782 obj=obj,
1783 objattrs={},
1784 )
1785 # Preserve object _trashbin_timestamp
1786 event.timestamp = completeRemoteData[
1787 f"{prefix}{r_objtype}"
1788 ][obj]._trashbin_timestamp
1789 else:
1790 # Preserve object _trashbin_timestamp
1791 obj._trashbin_timestamp = completeLocalDataObjtype[
1792 obj
1793 ]._trashbin_timestamp
1795 # Process Event and update cache if no error is met,
1796 # enqueue event otherwise
1797 self.__processLocalEvent(
1798 None, event, enqueueEventWithError=True
1799 )
1801 self.__config.savecachefile() # Save config to be able to rebuild datamodel
1802 self.__datamodel.saveLocalAndRemoteData() # Save data
1803 self.__checkDatamodelWarnings()
1805 def __status(
1806 self, verbose=False, level="information", ignoreUnhandledExceptions=False
1807 ) -> dict[str, dict[str, dict[str, Any]]]:
1808 """Returns a dict containing status for current client Datamodel and error
1809 queue.
1811 Each status contains 3 categories/levels: "information", "warning" and "error"
1812 """
1813 if level not in ("information", "warning", "error"):
1814 raise AttributeError(
1815 f"Specified level '{level}' is invalid. Possible values are"
1816 """("information", "warning", "error"):"""
1817 )
1819 appname: str = self.__config["appname"]
1821 match level:
1822 case "error":
1823 levels = ["error"]
1824 case "warning":
1825 levels = [
1826 "warning",
1827 "error",
1828 ]
1829 case "information":
1830 levels = [
1831 "information",
1832 "warning",
1833 "error",
1834 ]
1836 res = {
1837 appname: {
1838 "information": {
1839 "startTime": self.__startTime.strftime("%Y-%m-%d %H:%M:%S"),
1840 "status": "paused" if self.__isPaused else "running",
1841 "pausedSince": (
1842 self.__isPaused.strftime("%Y-%m-%d %H:%M:%S")
1843 if self.__isPaused
1844 else "None"
1845 ),
1846 },
1847 "warning": {},
1848 "error": {},
1849 },
1850 }
1851 if not ignoreUnhandledExceptions and self.__cache.exception:
1852 res[appname]["error"]["unhandledException"] = self.__cache.exception
1854 # Datamodel
1855 res["datamodel"] = {
1856 "information": {},
1857 "warning": {},
1858 "error": {},
1859 }
1860 if self.__datamodel.unknownRemoteTypes:
1861 res["datamodel"]["warning"]["unknownRemoteTypes"] = sorted(
1862 self.__datamodel.unknownRemoteTypes
1863 )
1864 if self.__datamodel.unknownRemoteAttributes:
1865 res["datamodel"]["warning"]["unknownRemoteAttributes"] = {
1866 k: sorted(v)
1867 for k, v in self.__datamodel.unknownRemoteAttributes.items()
1868 }
1870 # Error queue
1871 res["errorQueue"] = {
1872 "information": {},
1873 "warning": {},
1874 "error": {},
1875 }
1877 if self.__datamodel.errorqueue is not None:
1878 eventNumber: int
1879 remoteEvent: Event | None
1880 localEvent: Event
1881 errorMsg: str
1882 for (
1883 eventNumber,
1884 remoteEvent,
1885 localEvent,
1886 errorMsg,
1887 ) in self.__datamodel.errorqueue:
1888 if errorMsg is None:
1889 # Ignore the events in queue that are not errors
1890 continue
1892 # Always try to get object from local cache in order to use configured
1893 # toString template for obj repr()
1894 objtype = localEvent.objtype
1896 _, obj = Datamodel.getObjectFromCacheOrTrashbin(
1897 self.__datamodel.localdata_complete, objtype, localEvent.objpkey
1898 )
1899 if obj is None:
1900 _, obj = Datamodel.getObjectFromCacheOrTrashbin(
1901 self.__datamodel.localdata, objtype, localEvent.objpkey
1902 )
1903 if obj is None and remoteEvent is not None:
1904 _, obj = Datamodel.getObjectFromCacheOrTrashbin(
1905 self.__datamodel.remotedata_complete,
1906 remoteEvent.objtype,
1907 remoteEvent.objpkey,
1908 )
1909 if obj is None:
1910 obj = f"<{localEvent.objtype}[{localEvent.objpkey}]>"
1911 else:
1912 obj = repr(obj)
1914 res["errorQueue"]["error"][eventNumber] = {
1915 "objrepr": obj,
1916 "errorMsg": errorMsg,
1917 }
1918 if verbose:
1919 res["errorQueue"]["error"][eventNumber] |= {
1920 "objtype": localEvent.objtype,
1921 "objpkey": localEvent.objpkey,
1922 "objattrs": localEvent.objattrs,
1923 }
1925 # Clean empty categories
1926 for objname in list(res.keys()):
1927 for category in ("information", "warning", "error"):
1928 if category not in levels or (
1929 not verbose and not res[objname][category]
1930 ):
1931 del res[objname][category]
1933 if not verbose and not res[objname]:
1934 del res[objname]
1936 return res
1938 def __notifyQueueErrors(self):
1939 """Notify of any objects change in error queue"""
1940 new_error = self.__status(level="error", ignoreUnhandledExceptions=True)
1942 new_errors = {}
1943 if "errorQueue" in new_error:
1944 for errNumber, err in new_error["errorQueue"]["error"].items():
1945 new_errors[errNumber] = f"{err['objrepr']}: {err['errorMsg']}"
1947 new_errstr = json.dumps(
1948 new_errors,
1949 cls=JSONEncoder,
1950 indent=4,
1951 )
1952 old_errstr = json.dumps(self.__cache.queueErrors, cls=JSONEncoder, indent=4)
1954 if new_errstr != old_errstr:
1955 if new_errors:
1956 desc = "objects in error queue have changed"
1957 else:
1958 desc = "no more objects in error queue"
1960 __hermes__.logger.info(desc)
1961 Email.sendDiff(
1962 config=self.__config,
1963 contentdesc=desc,
1964 previous=old_errstr,
1965 current=new_errstr,
1966 )
1967 self.__cache.queueErrors = new_errors
1969 def __notifyDatamodelWarnings(self):
1970 """Notify of any data model warnings changes"""
1971 new_errors = self.__status(level="warning", ignoreUnhandledExceptions=True)
1973 if "datamodel" not in new_errors:
1974 new_errors = {}
1975 else:
1976 new_errors = new_errors["datamodel"]
1978 new_errstr = json.dumps(
1979 new_errors,
1980 cls=JSONEncoder,
1981 indent=4,
1982 )
1983 old_errstr = json.dumps(
1984 self.__cache.datamodelWarnings, cls=JSONEncoder, indent=4
1985 )
1987 if new_errors:
1988 __hermes__.logger.error("Datamodel has warnings:\n" + new_errstr)
1990 if new_errstr != old_errstr:
1991 if new_errors:
1992 desc = "datamodel warnings have changed"
1993 else:
1994 desc = "no more datamodel warnings"
1996 __hermes__.logger.info(desc)
1997 Email.sendDiff(
1998 config=self.__config,
1999 contentdesc=desc,
2000 previous=old_errstr,
2001 current=new_errstr,
2002 )
2003 self.__cache.datamodelWarnings = new_errors
2005 def __notifyException(self, trace: str | None):
2006 """Notify of any unhandled exception met/solved"""
2007 if trace:
2008 __hermes__.logger.critical(f"Unhandled exception: {trace}")
2010 if self.__cache.exception != trace:
2011 if trace:
2012 desc = "unhandled exception"
2013 else:
2014 desc = "no more unhandled exception"
2016 __hermes__.logger.info(desc)
2017 previous = "" if self.__cache.exception is None else self.__cache.exception
2018 current = "" if trace is None else trace
2019 Email.sendDiff(
2020 config=self.__config,
2021 contentdesc=desc,
2022 previous=previous,
2023 current=current,
2024 )
2025 self.__cache.exception = trace
2027 def __notifyFatalException(self, trace: str):
2028 """Notify of any fatal exception met before, and terminate app"""
2029 self.__isStopped = True
2031 desc = "Unhandled fatal exception, APP WILL TERMINATE IMMEDIATELY"
2032 NL = "\n"
2034 __hermes__.logger.critical(f"{desc}: {trace}")
2036 Email.send(
2037 config=self.__config,
2038 subject=f"[{self.__config['appname']}] {desc}",
2039 content=f"{desc}:{NL}{NL}{trace}",
2040 )