Coverage for server/hermesserver.py: 69%
360 statements
« prev ^ index » next coverage.py v7.10.1, created at 2025-07-28 07:25 +0000
« prev ^ index » next coverage.py v7.10.1, created at 2025-07-28 07:25 +0000
1#!/usr/bin/env python3
2# -*- coding: utf-8 -*-
4# Hermes : Change Data Capture (CDC) tool from any source(s) to any target
5# Copyright (C) 2023, 2024 INSA Strasbourg
6#
7# This file is part of Hermes.
8#
9# Hermes is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# Hermes is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with Hermes. If not, see <https://www.gnu.org/licenses/>.
23from lib.config import HermesConfig
24from lib.version import HERMES_VERSION
25from lib.datamodel.dataschema import Dataschema
26from lib.datamodel.dataobject import DataObject
27from lib.datamodel.datasource import Datasource
28from lib.datamodel.diffobject import DiffObject
29from lib.datamodel.event import Event
30from lib.datamodel.serialization import LocalCache, JSONEncoder
31from lib.plugins import AbstractMessageBusProducerPlugin, FailedToSendEventError
32from lib.utils.mail import Email
33from lib.utils.socket import (
34 SockServer,
35 SocketMessageToServer,
36 SocketMessageToClient,
37 SocketArgumentParser,
38 SocketParsingError,
39 SocketParsingMessage,
40)
41from server.datamodel import Datamodel
43from datetime import datetime, timedelta
44import argparse
45import json
46import time
47import signal
48import traceback
49from types import FrameType
50from typing import Any
53class HermesServerCache(LocalCache):
54 """Hermes server data to cache"""
56 def __init__(self, from_json_dict: dict[str, Any] = {}):
57 super().__init__(
58 jsondataattr=["lastUpdate", "errors", "exception"],
59 cachefilename="_hermes-server",
60 )
62 self.lastUpdate: datetime | None = from_json_dict.get("lastUpdate")
63 """Datetime of latest update"""
65 self.errors: dict[str, dict[str, dict[str, Any]]] = from_json_dict.get(
66 "errors", {}
67 )
68 """Dictionary containing current errors, for notifications"""
70 self.exception: str | None = from_json_dict.get("exception")
71 """String containing latest exception trace"""
73 def savecachefile(self, cacheFilename: str | None = None):
74 """Override method only to disable backup files in cache"""
75 return super().savecachefile(cacheFilename, dontKeepBackup=True)
77 # Example of cache migration method
78 # @classmethod
79 # def migrate_from_v0_0_3_to_v0_1_0(
80 # cls: "HermesServerCache", jsondict: Any | dict[Any, Any]
81 # ) -> Any | dict[Any, Any]:
82 # jsondict["lastUpdate"] = datetime.fromisoformat(jsondict["lastUpdate"])
83 # return jsondict
86class HermesServer:
87 """Hermes-server main class"""
89 def __init__(self, config: HermesConfig):
90 """Set up a server instance.
91 The mainloop() method MUST then be called to start the service"""
93 __hermes__.logger.info(f"Starting {config['appname']} v{HERMES_VERSION}")
95 # Setup the signals handler
96 config.setSignalsHandler(self.signalHandler)
98 self.config: HermesConfig = config
99 self._msgbus: AbstractMessageBusProducerPlugin = self.config["hermes"][
100 "plugins"
101 ]["messagebus"]["plugininstance"]
102 self.dm: Datamodel = Datamodel(self.config)
104 self._cache: HermesServerCache = HermesServerCache.loadcachefile(
105 "_hermes-server"
106 )
107 """Cached attributes"""
109 self._initSyncRequested: bool = False
110 """Indicate that an initsync sequence has been requested"""
111 self._isStopped: bool = False
112 """mainloop() will run until this var is set to True"""
113 self._isPaused: datetime | None = None
114 """Contains pause datetime if standard processing is paused, None otherwise"""
115 self._forceUpdate = False
116 """Indicate that a (forced) update command has been requested"""
117 self._updateInterval: timedelta = timedelta(
118 seconds=config["hermes-server"]["updateInterval"]
119 )
120 """Interval between two update"""
121 self._numberOfLoopToProcess: int | None = None
122 """**For functionnal tests only**, if a value is set, will process for *value*
123 iterations of mainloop and pause execution until a new positive value is set"""
125 self._firstFetchDone = False
126 """Indicate if a full data set has been fetched since start"""
128 now = datetime.now()
129 self._nextUpdate: datetime = now
130 """Datetime to wait before processing next update"""
131 if (
132 self._cache.lastUpdate
133 and now < self._cache.lastUpdate + self._updateInterval
134 ):
135 self._nextUpdate = self._cache.lastUpdate + self._updateInterval
137 self.startTime: datetime | None = None
138 """Datetime when mainloop was started"""
140 self._sock: SockServer | None = None
141 if (
142 config["hermes"]["cli_socket"]["path"] is not None
143 or config["hermes"]["cli_socket"]["dont_manage_sockfile"] is not None
144 ):
145 self._sock = SockServer(
146 path=config["hermes"]["cli_socket"]["path"],
147 owner=config["hermes"]["cli_socket"]["owner"],
148 group=config["hermes"]["cli_socket"]["group"],
149 mode=config["hermes"]["cli_socket"]["mode"],
150 processHdlr=self._processSocketMessage,
151 dontManageSockfile=config["hermes"]["cli_socket"][
152 "dont_manage_sockfile"
153 ],
154 )
155 self.__setupSocketParser()
157 def __setupSocketParser(self):
158 """Set up the argparse context for unix socket commands"""
159 self._parser = SocketArgumentParser(
160 prog=f"{self.config['appname']}-cli",
161 description="Hermes Server CLI",
162 exit_on_error=False,
163 )
165 subparsers = self._parser.add_subparsers(help="Sub-commands")
167 # Initsync
168 sp_initsync = subparsers.add_parser(
169 "initsync",
170 help=(
171 "Send specific init message containing all data but passwords."
172 " Useful to fill new client"
173 ),
174 )
175 sp_initsync.set_defaults(func=self.sock_initsync)
177 # Update
178 sp_update = subparsers.add_parser(
179 "update",
180 help="Force update now, ignoring updateInterval",
181 )
182 sp_update.set_defaults(func=self.sock_update)
184 # Quit
185 sp_quit = subparsers.add_parser("quit", help="Stop server")
186 sp_quit.set_defaults(func=self.sock_quit)
188 # Pause
189 sp_pause = subparsers.add_parser(
190 "pause", help="Pause processing until 'resume' command is sent"
191 )
192 sp_pause.set_defaults(func=self.sock_pause)
194 # Resume
195 sp_resume = subparsers.add_parser(
196 "resume", help="Resume processing that has been paused with 'pause'"
197 )
198 sp_resume.set_defaults(func=self.sock_resume)
200 # Status
201 sp_status = subparsers.add_parser("status", help="Show server status")
202 sp_status.set_defaults(func=self.sock_status)
203 sp_status.add_argument(
204 "-j",
205 "--json",
206 action="store_const",
207 const=True,
208 default=False,
209 help="Print status as json",
210 )
211 sp_status.add_argument(
212 "-v",
213 "--verbose",
214 action="store_const",
215 const=True,
216 default=False,
217 help="Output items without values",
218 )
220 def signalHandler(self, signalnumber: int, frame: FrameType | None):
221 """Signal handler that will be called on SIGINT and SIGTERM"""
222 __hermes__.logger.critical(
223 f"Signal '{signal.strsignal(signalnumber)}' received, terminating"
224 )
225 self._isStopped = True
227 def _processSocketMessage(
228 self, msg: SocketMessageToServer
229 ) -> SocketMessageToClient:
230 """Handler that process specified msg received on unix socket and returns the
231 answer to send"""
232 reply: SocketMessageToClient | None = None
234 try:
235 args = self._parser.parse_args(msg.argv)
236 if "func" not in args:
237 raise SocketParsingMessage(self._parser.format_help())
238 except (SocketParsingError, SocketParsingMessage) as e:
239 retmsg = str(e)
240 except argparse.ArgumentError as e:
241 retmsg = self._parser.format_error(str(e))
242 else:
243 try:
244 reply = args.func(args)
245 except Exception as e:
246 lines = traceback.format_exception(type(e), e, e.__traceback__)
247 trace = "".join(lines).strip()
248 __hermes__.logger.critical(f"Unhandled exception: {trace}")
249 retmsg = trace
251 if reply is None: # Error was met
252 reply = SocketMessageToClient(retcode=1, retmsg=retmsg)
254 return reply
256 def sock_initsync(self, args: argparse.Namespace) -> SocketMessageToClient:
257 """Handler called when a valid initsync subcommand is requested on unix
258 socket"""
259 self._initSyncRequested = True
260 return SocketMessageToClient(retcode=0, retmsg="")
262 def sock_update(self, args: argparse.Namespace) -> SocketMessageToClient:
263 """Handler called when a valid update subcommand is requested on unix socket"""
264 self._forceUpdate = True
265 return SocketMessageToClient(retcode=0, retmsg="")
267 def sock_quit(self, args: argparse.Namespace) -> SocketMessageToClient:
268 """Handler called when quit subcommand is requested on unix socket"""
269 self._isStopped = True
270 __hermes__.logger.info("hermes-server has been requested to quit")
271 return SocketMessageToClient(retcode=0, retmsg="")
273 def sock_pause(self, args: argparse.Namespace) -> SocketMessageToClient:
274 """Handler called when pause subcommand is requested on unix socket"""
275 if self._isStopped:
276 return SocketMessageToClient(
277 retcode=1, retmsg="Error: server is currently being stopped"
278 )
280 if self._isPaused:
281 return SocketMessageToClient(
282 retcode=1, retmsg="Error: server is already paused"
283 )
285 __hermes__.logger.info("hermes-server has been requested to pause")
286 self._isPaused = datetime.now()
287 return SocketMessageToClient(retcode=0, retmsg="")
289 def sock_resume(self, args: argparse.Namespace) -> SocketMessageToClient:
290 """Handler called when resume subcommand is requested on unix socket"""
291 if self._isStopped:
292 return SocketMessageToClient(
293 retcode=1, retmsg="Error: server is currently being stopped"
294 )
296 if not self._isPaused:
297 return SocketMessageToClient(
298 retcode=1, retmsg="Error: server is not paused"
299 )
301 __hermes__.logger.info("hermes-server has been requested to resume")
302 self._isPaused = None
303 return SocketMessageToClient(retcode=0, retmsg="")
305 def sock_status(self, args: argparse.Namespace) -> SocketMessageToClient:
306 """Handler called when status subcommand is requested on unix socket"""
307 status = self.status(verbose=args.verbose)
308 if args.json:
309 msg = json.dumps(status, indent=4)
310 else:
311 nl = "\n"
312 info2printable = {
313 "inconsistencies": "Inconsistencies",
314 "mergeConflicts": "Merge conflicts",
315 "integrityFiltered": "Filtered by integrity constraints",
316 "mergeFiltered": "Filtered by merge constraints",
317 }
318 msg = ""
319 for objname in ["hermes-server"] + list(status.keys() - ("hermes-server",)):
320 infos = status[objname]
321 msg += f"{objname}:{nl}"
322 for category in ("information", "warning", "error"):
323 if category not in infos:
324 continue
325 if not infos[category]:
326 msg += f" * {category.capitalize()}: []{nl}"
327 continue
329 msg += f" * {category.capitalize()}{nl}"
330 for infoname, infodata in infos[category].items():
331 indentedinfodata = str(infodata).replace("\n", "\n ")
332 msg += (
333 f" - {info2printable.get(infoname, infoname)}:"
334 f" {indentedinfodata}{nl}"
335 )
336 msg = msg.rstrip()
338 return SocketMessageToClient(retcode=0, retmsg=msg)
340 def _checkForSchemaChanges(self):
341 curschema: Dataschema = self.dm.dataschema
342 oldschema: Dataschema = Dataschema.loadcachefile("_dataschema")
343 diff = curschema.diffFrom(oldschema)
345 if diff:
346 old: dict[str, Any] = oldschema.schema
347 new: dict[str, Any] = curschema.schema
348 if old:
349 __hermes__.logger.info("Dataschema has changed since last run")
350 else:
351 __hermes__.logger.info("Loading first dataschema")
353 if diff.added:
354 __hermes__.logger.info(f"Types added in Dataschema: {diff.added}")
356 if diff.removed:
357 __hermes__.logger.info(
358 f"Types removed from Dataschema: {diff.removed},"
359 " generate events to mark data as deleted"
360 )
362 # Create a datasource with same content as cache, minus the types to
363 # remove
364 olddata: Datasource = Datasource(
365 schema=oldschema, enableTrashbin=False, enableCache=False
366 )
367 olddata.loadFromCache()
369 # Create an empty datasource and copy the data types to keep into it
370 newdata: Datasource = Datasource(
371 schema=oldschema, enableTrashbin=False, enableCache=False
372 )
373 for objtype in oldschema.schema.keys():
374 if objtype not in diff.removed:
375 newdata[objtype] = olddata[objtype]
377 # Send remove event of each entry of each removed type
378 self.generateAndSendEvents(
379 eventCategory="base",
380 data=newdata,
381 cache=olddata,
382 save=True,
383 commit=False,
384 sendEvents=True,
385 )
387 __hermes__.logger.info(
388 f"Types removed from Dataschema: {diff.removed},"
389 " purging cache files"
390 )
391 for objtype in diff.removed:
392 LocalCache.deleteAllCacheFiles(objtype)
394 if diff.modified:
395 for objtype in diff.modified:
396 n = new[objtype]
397 o = old[objtype]
398 # HERMES_ATTRIBUTES
399 added = n["HERMES_ATTRIBUTES"] - o["HERMES_ATTRIBUTES"]
400 removed = o["HERMES_ATTRIBUTES"] - n["HERMES_ATTRIBUTES"]
401 if added:
402 __hermes__.logger.info(
403 f"New attributes in dataschema type '{objtype}': {added}"
404 )
405 if removed:
406 __hermes__.logger.info(
407 f"Removed attributes from dataschema type '{objtype}':"
408 f" {removed}"
409 )
411 # SECRETS_ATTRIBUTES
412 added = n["SECRETS_ATTRIBUTES"] - o["SECRETS_ATTRIBUTES"]
413 removed = o["SECRETS_ATTRIBUTES"] - n["SECRETS_ATTRIBUTES"]
414 if added:
415 __hermes__.logger.info(
416 f"New secrets attributes in dataschema type '{objtype}':"
417 f" {added}"
418 )
419 # We need to purge attribute from cache: as cache is loaded with
420 # attribute set up as SECRET, we just have to save the cache
421 # (attr won't be saved anymore, as it's SECRET) and reload
422 # cache to "forget" values loaded from previous cache
423 self.dm.data.cache.save()
424 self.dm.data.cache.loadFromCache()
425 if removed:
426 __hermes__.logger.info(
427 "Removed secrets attributes from dataschema type"
428 f" '{objtype}': {removed}"
429 )
431 if old:
432 e = Event(
433 evcategory="base",
434 eventtype="dataschema",
435 objattrs=new,
436 )
437 __hermes__.logger.info(
438 f"Sending new schema on message bus {e.toString(set())}"
439 )
440 with self._msgbus:
441 self._msgbus.send(event=e)
443 self.dm.dataschema.savecachefile()
445 def mainLoop(self):
446 """Server main loop"""
447 self.startTime = datetime.now()
449 if self._sock is not None:
450 self._sock.startProcessMessagesDaemon(appname=__hermes__.appname)
452 # Process schema changes if any, until it succeed
453 checkForSchemaChangesDone = False
454 while not self._isStopped and not checkForSchemaChangesDone:
455 try:
456 self._checkForSchemaChanges()
457 except Exception as e:
458 lines = traceback.format_exception(type(e), e, e.__traceback__)
459 trace = "".join(lines).strip()
460 self.notifyException(trace)
461 self._cache.savecachefile()
462 else:
463 checkForSchemaChangesDone = True
465 # Reduce sleep duration during functional tests to speed them up
466 sleepDuration = 1 if self._numberOfLoopToProcess is None else 0.05
468 while not self._isStopped:
469 try:
470 if self._initSyncRequested:
471 self.initsync()
472 self._initSyncRequested = False
474 if self._numberOfLoopToProcess is None:
475 # Normal operations
476 updateRequired = self._forceUpdate or (
477 not self._isPaused and datetime.now() >= self._nextUpdate
478 )
479 else:
480 # Special case for functional tests
481 updateRequired = self._numberOfLoopToProcess > 0
483 if not updateRequired:
484 time.sleep(sleepDuration)
485 if self._nextUpdate + self._updateInterval < datetime.now():
486 # Keep updating _nextUpdate even when paused, ensuring that its
487 # value remains in the past. This will avoid an uninterrupted
488 # update sequence to make up for the pause time
489 self._nextUpdate += self._updateInterval
490 continue
492 # Standard run
493 if self._forceUpdate:
494 self._forceUpdate = False
495 else:
496 self._nextUpdate += self._updateInterval
498 self.dm.fetch()
499 self.generateAndSendEvents(
500 eventCategory="base",
501 data=self.dm.data,
502 cache=self.dm.data.cache,
503 save=True,
504 commit=True,
505 sendEvents=(self._cache.lastUpdate is not None),
506 )
507 self._cache.lastUpdate = datetime.now()
508 self.notifyException(None)
509 self._cache.savecachefile()
511 except Exception as e:
512 lines = traceback.format_exception(type(e), e, e.__traceback__)
513 trace = "".join(lines).strip()
514 self.notifyException(trace)
515 self._cache.savecachefile()
517 __hermes__.logger.warning(
518 "An error was met. Waiting 60 seconds before retrying"
519 )
520 # Wait one second 60 times to avoid waiting too long before stopping
521 for i in range(60):
522 if self._isStopped:
523 break
524 time.sleep(1)
526 # Only used in functionnal tests
527 if self._numberOfLoopToProcess:
528 self._numberOfLoopToProcess -= 1
530 self._cache.savecachefile()
532 def status(
533 self, verbose=False, level="information", ignoreUnhandledExceptions=False
534 ) -> dict[str, dict[str, dict[str, Any]]]:
535 """Returns a dict containing status for hermes-server and each defined type in
536 datamodel.
538 Each status contains 3 categories/levels: ""information", "warning" and "error"
539 """
540 if level not in ("information", "warning", "error"):
541 raise AttributeError(
542 f"Specified level '{level}' is invalid."
543 """ Possible values are ("information", "warning", "error"):"""
544 )
546 if level == "error":
547 levels = ["error"]
548 elif level == "warning":
549 levels = [
550 "warning",
551 "error",
552 ]
553 elif level == "information":
554 levels = [
555 "information",
556 "warning",
557 "error",
558 ]
560 res = {
561 "hermes-server": {
562 "information": {
563 "startTime": self.startTime.strftime("%Y-%m-%d %H:%M:%S"),
564 "status": "paused" if self._isPaused else "running",
565 "pausedSince": (
566 self._isPaused.strftime("%Y-%m-%d %H:%M:%S")
567 if self._isPaused
568 else "None"
569 ),
570 "lastUpdate": (
571 self._cache.lastUpdate.strftime("%Y-%m-%d %H:%M:%S")
572 if self._cache.lastUpdate
573 else "None"
574 ),
575 "nextUpdate": self._nextUpdate.strftime("%Y-%m-%d %H:%M:%S"),
576 },
577 "warning": {},
578 "error": {},
579 },
580 }
581 if not ignoreUnhandledExceptions and self._cache.exception:
582 res["hermes-server"]["error"]["unhandledException"] = self._cache.exception
584 for objname, objlist in self.dm.data.items():
585 res[objname] = {
586 "information": {},
587 "warning": {},
588 "error": {},
589 }
590 if not self._firstFetchDone and objname in self._cache.errors:
591 res[objname] |= self._cache.errors[objname]
593 for level, src in [
594 ("error", "inconsistencies"),
595 ("error", "mergeConflicts"),
596 ("warning", "integrityFiltered"),
597 ("warning", "mergeFiltered"),
598 ]:
599 if getattr(objlist, src):
600 res[objname][level][src] = []
601 for pkey in sorted(getattr(objlist, src)):
602 obj = objlist.get(pkey)
603 objrepr = pkey if obj is None else repr(obj)
604 res[objname][level][src].append(objrepr)
606 for objname in self.dm.data.keys() | ("hermes-server",):
607 for category in ("information", "warning", "error"):
608 if category not in levels or (
609 not verbose and not res[objname][category]
610 ):
611 del res[objname][category]
613 if not verbose and not res[objname]:
614 del res[objname]
616 return res
618 def initsync(self):
619 """Send an initsync sequence"""
620 empty: Datasource = Datasource(
621 schema=self.dm.dataschema, enableTrashbin=False, enableCache=False
622 )
623 self.generateAndSendEvents(
624 eventCategory="initsync",
625 data=self.dm.data.cache,
626 cache=empty,
627 save=False,
628 commit=False,
629 sendEvents=True,
630 )
632 def generateAndSendEvents(
633 self,
634 eventCategory: str,
635 data: Datasource,
636 cache: Datasource,
637 save: bool,
638 commit: bool,
639 sendEvents: bool,
640 ):
641 """Generate and send events of specified eventCategory ("base" or "initsync"),
642 computed upon differences between specified data and cache.
643 If save is True, cache will be updated and saved on disk.
644 If sendEvents is True, events will be sent on msgbus.
645 If commit and sendEvents are True, the datamodel commit_one and commit_all
646 methods will be called"""
648 if eventCategory not in ("base", "initsync"):
649 err = f"Specified eventType '{eventCategory}' is invalid"
650 __hermes__.logger.critical(err)
651 raise ValueError(err)
653 with self._msgbus:
654 if eventCategory == "initsync" and sendEvents:
655 self._msgbus.send(
656 Event(
657 evcategory=eventCategory,
658 eventtype="init-start",
659 obj=None,
660 objattrs=self.dm.dataschema.schema, # Send current schema
661 )
662 )
664 # Loop over each datamodel type to compute diffs
665 diffs: dict[str, DiffObject] = {}
666 for objtype in data.keys():
667 # Generate diff between fresh data and cache
668 diff = data[objtype].diffFrom(cache[objtype])
669 diffs[objtype] = diff
670 if diff:
671 __hermes__.logger.info(
672 f"{objtype} have changed: {len(diff.added)} added,"
673 f" {len(diff.modified)} modified,"
674 f" {len(diff.removed)} removed"
675 )
677 # Process events
678 for changeType in ["added", "modified", "removed"]:
679 if changeType == "removed":
680 # Process removed events in the datamodel reversed declaration order
681 objTypes = reversed(data.keys())
682 else:
683 # Process other events in the datamodel declaration order
684 objTypes = data.keys()
686 for objtype in objTypes:
687 secretAttrs = data.schema.secretsAttributesOf(objtype)
688 diff = diffs[objtype]
689 difflist = diff.dict[changeType]
690 # Loop over each diff item of current changeType and create event
691 diffitem: DiffObject | DataObject
692 for diffitem in difflist:
693 (event, obj) = Event.fromDiffItem(
694 diffitem, eventCategory, changeType
695 )
697 if sendEvents:
698 # Send event
699 try:
700 __hermes__.logger.info(
701 f"Sending {event.toString(secretAttrs)}"
702 )
703 self._msgbus.send(event=event)
704 except FailedToSendEventError as e:
705 # Event not sent
706 if save:
707 cache.save()
708 __hermes__.logger.critical(
709 f"Failed to send event. Execution aborted: {str(e)}"
710 )
711 raise
713 # Event sent, validate its changes
714 if eventCategory == "base":
715 if sendEvents and commit:
716 self.dm.commit_one(obj)
718 match event.eventtype:
719 case "added":
720 cache[objtype].append(obj)
721 case "removed":
722 cache[objtype].remove(obj)
723 case "modified":
724 cache[objtype].replace(obj)
726 if sendEvents and commit:
727 self.dm.commit_all(objtype)
729 if save:
730 cache.save()
732 if eventCategory == "initsync" and sendEvents:
733 self._msgbus.send(
734 Event(
735 evcategory=eventCategory,
736 eventtype="init-stop",
737 obj=None,
738 objattrs={},
739 )
740 )
742 self._firstFetchDone = True
743 self.notifyErrors()
745 def notifyErrors(self):
746 """Notify of any data error met/solved"""
747 new_errors = self.status(level="error", ignoreUnhandledExceptions=True)
749 new_errstr = json.dumps(
750 new_errors,
751 cls=JSONEncoder,
752 indent=4,
753 )
754 old_errstr = json.dumps(self._cache.errors, cls=JSONEncoder, indent=4)
756 nl = "\n"
758 if new_errors:
759 __hermes__.logger.error(f"Data errors met: {nl}{new_errstr}")
761 if new_errstr != old_errstr:
762 if new_errors:
763 desc = "data errors met"
764 else:
765 desc = "no more data errors"
767 __hermes__.logger.info(desc)
768 Email.sendDiff(
769 config=self.config,
770 contentdesc=desc,
771 previous=old_errstr,
772 current=new_errstr,
773 )
774 self._cache.errors = new_errors
776 def notifyException(self, trace: str | None):
777 """Notify of any unhandled exception met/solved"""
778 if trace:
779 __hermes__.logger.critical(f"Unhandled exception: {trace}")
781 if self._cache.exception != trace:
782 if trace:
783 desc = "unhandled exception"
784 else:
785 desc = "no more unhandled exception"
787 __hermes__.logger.info(desc)
788 previous = "" if self._cache.exception is None else self._cache.exception
789 current = "" if trace is None else trace
790 Email.sendDiff(
791 config=self.config,
792 contentdesc=desc,
793 previous=previous,
794 current=current,
795 )
796 self._cache.exception = trace