Coverage for clients/__init__.py: 83%

882 statements  

« prev     ^ index     » next       coverage.py v7.10.1, created at 2025-07-28 07:24 +0000

1#!/usr/bin/env python3 

2# -*- coding: utf-8 -*- 

3 

4# Hermes : Change Data Capture (CDC) tool from any source(s) to any target 

5# Copyright (C) 2023, 2024 INSA Strasbourg 

6# 

7# This file is part of Hermes. 

8# 

9# Hermes is free software: you can redistribute it and/or modify 

10# it under the terms of the GNU General Public License as published by 

11# the Free Software Foundation, either version 3 of the License, or 

12# (at your option) any later version. 

13# 

14# Hermes is distributed in the hope that it will be useful, 

15# but WITHOUT ANY WARRANTY; without even the implied warranty of 

16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

17# GNU General Public License for more details. 

18# 

19# You should have received a copy of the GNU General Public License 

20# along with Hermes. If not, see <https://www.gnu.org/licenses/>. 

21 

22 

23from clients.datamodel import Datamodel, InvalidDataError 

24from lib.config import HermesConfig 

25from lib.version import HERMES_VERSION 

26from lib.datamodel.dataobject import DataObject 

27from lib.datamodel.dataobjectlist import DataObjectList 

28from lib.datamodel.datasource import Dataschema, Datasource 

29from lib.datamodel.diffobject import DiffObject 

30from lib.datamodel.event import Event 

31from lib.datamodel.serialization import LocalCache, JSONEncoder 

32from lib.plugins import AbstractMessageBusConsumerPlugin 

33from lib.utils.mail import Email 

34from lib.utils.socket import ( 

35 SockServer, 

36 SocketMessageToServer, 

37 SocketMessageToClient, 

38 SocketArgumentParser, 

39 SocketParsingError, 

40 SocketParsingMessage, 

41) 

42 

43from copy import deepcopy 

44from datetime import datetime, timedelta 

45from time import sleep 

46from types import FrameType 

47from typing import Any 

48import argparse 

49import json 

50import signal 

51import traceback 

52 

53 

54class HermesAlreadyNotifiedException(Exception): 

55 """Raised when an exception has already been notified, to avoid a second 

56 notification""" 

57 

58 

59class HermesClientHandlerError(Exception): 

60 """Raised when an exception is met during client handler call""" 

61 

62 def __init__(self, err: Exception | str | None): 

63 if isinstance(err, Exception): 

64 self.msg: str | None = HermesClientHandlerError.exceptionToString(err) 

65 """Printable error message, None in a rare case where exception is raised 

66 without error, but to postpone an event processing""" 

67 else: 

68 self.msg = err 

69 super().__init__(self.msg) 

70 

71 @staticmethod 

72 def exceptionToString( 

73 exception: Exception, purgeCurrentFileFromTrace: bool = True 

74 ) -> str: 

75 """Convert the specified exception to a string containing its full trace""" 

76 lines = traceback.format_exception( 

77 type(exception), exception, exception.__traceback__ 

78 ) 

79 

80 if purgeCurrentFileFromTrace: 

81 # Purging current file infos from traceback 

82 lines = [line for line in lines if __file__ not in line] 

83 

84 return "".join(lines).strip() 

85 

86 

87class HermesClientCache(LocalCache): 

88 """Hermes client data to cache""" 

89 

90 def __init__(self, from_json_dict: dict[str, Any] = {}): 

91 super().__init__( 

92 jsondataattr=[ 

93 "queueErrors", 

94 "datamodelWarnings", 

95 "exception", 

96 "initstartoffset", 

97 "initstopoffset", 

98 "nextoffset", 

99 ], 

100 ) 

101 

102 self.queueErrors: dict[str, str] = from_json_dict.get("queueErrors", {}) 

103 """Dictionary containing current objects in error queue, for notifications""" 

104 

105 self.datamodelWarnings: dict[str, dict[str, dict[str, Any]]] = ( 

106 from_json_dict.get("datamodelWarnings", {}) 

107 ) 

108 """Dictionary containing current datamodel warnings, for notifications""" 

109 

110 self.exception: str | None = from_json_dict.get("exception") 

111 """String containing latest exception trace""" 

112 

113 self.initstartoffset: Any | None = from_json_dict.get("initstartoffset") 

114 """Contains the offset of the first message of initSync sequence on message 

115 bus""" 

116 self.initstopoffset: Any | None = from_json_dict.get("initstopoffset") 

117 """Contains the offset of the last message of initSync sequence on message 

118 bus""" 

119 self.nextoffset: Any | None = from_json_dict.get("nextoffset") 

120 """Contains the offset of the next message to process on message bus""" 

121 

122 def savecachefile(self, cacheFilename: str | None = None): 

123 """Override method only to disable backup files in cache""" 

124 return super().savecachefile(cacheFilename, dontKeepBackup=True) 

125 

126 

127class GenericClient: 

128 """Superclass of all hermes-client implementations. 

129 Manage all the internals of hermes for a client: datamodel updates, caching, error 

130 management, trashbin and converting messages from message bus into events handlers 

131 calls""" 

132 

133 __FOREIGNKEYS_POLICIES: dict[str, tuple[str]] = { 

134 "disabled": tuple(), 

135 "on_remove_event": ("removed",), 

136 "on_every_event": ("added", "modified", "removed"), 

137 } 

138 """Different foreignkeys_policy settings : associate each foreignkeys_policy 

139 (as key) with the list of event types that will be placed in the error queue if the 

140 object concerning them is the parent (by foreign key) of an object already present 

141 in the error queue""" 

142 

143 def __init__(self, config: HermesConfig): 

144 """Instantiate a new client""" 

145 

146 __hermes__.logger.info(f"Starting {config['appname']} v{HERMES_VERSION}") 

147 

148 # Setup the signals handler 

149 config.setSignalsHandler(self.__signalHandler) 

150 

151 self.__config: HermesConfig = config 

152 """Current config""" 

153 try: 

154 self.config: dict[str, Any] = self.__config[self.__config["appname"]] 

155 """Dict containing the client plugin configuration""" 

156 except KeyError: 

157 self.config = {} 

158 

159 self.__previousconfig: HermesConfig = HermesConfig.loadcachefile( 

160 "_hermesconfig" 

161 ) 

162 """Previous config (from cache)""" 

163 

164 self.__msgbus: AbstractMessageBusConsumerPlugin = self.__config["hermes"][ 

165 "plugins" 

166 ]["messagebus"]["plugininstance"] 

167 self.__msgbus.setTimeout( 

168 self.__config["hermes-client"]["updateInterval"] * 1000 

169 ) 

170 

171 self.__cache: HermesClientCache = HermesClientCache.loadcachefile( 

172 f"_{self.__config['appname']}" 

173 ) 

174 """Cached attributes""" 

175 self.__cache.setCacheFilename(f"_{self.__config['appname']}") 

176 

177 self.__startTime: datetime | None = None 

178 """Datetime when mainloop was started""" 

179 

180 self.__isPaused: datetime | None = None 

181 """Contains pause datetime if standard processing is paused, None otherwise""" 

182 

183 self.__isStopped: bool = False 

184 """mainloop() will run until this var is set to True""" 

185 

186 self.__numberOfLoopToProcess: int | None = None 

187 """**For functionnal tests only**, if a value is set, will process for *value* 

188 iterations of mainloop and pause execution until a new positive value is set""" 

189 

190 self.__sock: SockServer | None = None 

191 """Facultative socket to allow cli communication""" 

192 if ( 

193 self.__config["hermes"]["cli_socket"]["path"] is not None 

194 or self.__config["hermes"]["cli_socket"]["dont_manage_sockfile"] is not None 

195 ): 

196 self.__sock = SockServer( 

197 path=self.__config["hermes"]["cli_socket"]["path"], 

198 owner=self.__config["hermes"]["cli_socket"]["owner"], 

199 group=self.__config["hermes"]["cli_socket"]["group"], 

200 mode=self.__config["hermes"]["cli_socket"]["mode"], 

201 processHdlr=self.__processSocketMessage, 

202 dontManageSockfile=self.__config["hermes"]["cli_socket"][ 

203 "dont_manage_sockfile" 

204 ], 

205 ) 

206 self.__setupSocketParser() 

207 

208 self.__useFirstInitsyncSequence: bool = self.__config["hermes-client"][ 

209 "useFirstInitsyncSequence" 

210 ] 

211 """Indicate if we prefer using the first/oldest or last/most recent 

212 initsync sequence available on message bus""" 

213 

214 self.__newdatamodel: Datamodel = Datamodel(config=self.__config) 

215 """New datamodel (from current config)""" 

216 

217 if self.__previousconfig.hasData(): 

218 # Start app with previous datamodel to be able to check differences with 

219 # new one through self.__processDatamodelUpdate() 

220 self.__datamodel: Datamodel = Datamodel(config=self.__previousconfig) 

221 """Current datamodel""" 

222 else: 

223 # As no previous datamodel is available, start app with new one 

224 self.__datamodel = self.__newdatamodel 

225 

226 self.__trashbin_retention: timedelta | None = None 

227 """Timedelta with delay to keep removed data in trashbin before permanently 

228 deleting it. 'None' means no trashbin""" 

229 if self.__config["hermes-client"]["trashbin_retention"] > 0: 

230 self.__trashbin_retention = timedelta( 

231 days=self.__config["hermes-client"]["trashbin_retention"] 

232 ) 

233 

234 self.__trashbin_purgeInterval: timedelta | None = timedelta( 

235 minutes=self.__config["hermes-client"]["trashbin_purgeInterval"] 

236 ) 

237 """Timedelta with delay between two trashbin purge attempts""" 

238 

239 self.__trashbin_lastpurge: datetime = datetime(year=1, month=1, day=1) 

240 """Datetime when latest trashbin purge was ran""" 

241 

242 self.__errorQueue_retryInterval: timedelta | None = timedelta( 

243 minutes=self.__config["hermes-client"]["errorQueue_retryInterval"] 

244 ) 

245 """Timedelta with delay between two attempts of processing events in error""" 

246 

247 self.__errorQueue_lastretry: datetime = datetime(year=1, month=1, day=1) 

248 """Datetime when latest error queue retry was ran""" 

249 

250 self.__currentStep: int = 0 

251 """Store the step number of current event processing. Will be stored in events 

252 in error queue to allow clients to resume an event where it has failed""" 

253 

254 self.__isPartiallyProcessed: bool = False 

255 """Store if some data has been processed during current event processing. Will 

256 be stored in events in error queue to handle autoremediation properly""" 

257 

258 self.__isAnErrorRetry: bool = False 

259 """Indicate to handler whether the current event is being processed as part of 

260 an error retry""" 

261 

262 self.__saveRequired: bool = False 

263 """Reset to False at each loop start, and if any change is made during 

264 processing, set to True in order to save all cache at the loop end. 

265 Used to avoid expensive .save() calls when unnecessary 

266 """ 

267 

268 self.__foreignkeys_events: tuple[str] = self.__FOREIGNKEYS_POLICIES[ 

269 self.__config["hermes-client"]["foreignkeys_policy"] 

270 ] 

271 """List of event types that will be placed in the error queue if the object 

272 concerning them is the parent (by foreign key) of an object already present in 

273 the error queue""" 

274 

275 def getObjectFromCache(self, objtype: str, objpkey: Any) -> DataObject: 

276 """Returns a deepcopy of an object from cache. 

277 Raise IndexError if objtype is invalid, or if objpkey is not found 

278 """ 

279 ds: Datasource = self.__datamodel.localdata 

280 

281 (_, obj) = Datamodel.getObjectFromCacheOrTrashbin(ds, objtype, objpkey) 

282 if obj is None: 

283 raise IndexError( 

284 f"No object of {objtype=} with {objpkey=} was found in cache" 

285 ) 

286 

287 return deepcopy(obj) 

288 

289 def getDataobjectlistFromCache(self, objtype: str) -> DataObjectList: 

290 """Returns cache of specified objtype, by reference. 

291 WARNING: Any modification of the cache content will mess up your client !!! 

292 Raise IndexError if objtype is invalid 

293 """ 

294 ds: Datasource = self.__datamodel.localdata 

295 cache = ds[objtype] 

296 trashbin = ds[f"trashbin_{objtype}"] 

297 

298 # Create an empty DataObjectList of same type as cache 

299 res = type(cache)(objlist=[]) 

300 

301 res.extend(cache) 

302 res.extend(trashbin) 

303 return res 

304 

305 def __signalHandler(self, signalnumber: int, frame: FrameType | None): 

306 """Signal handler that will be called on SIGINT and SIGTERM""" 

307 __hermes__.logger.critical( 

308 f"Signal '{signal.strsignal(signalnumber)}' received, terminating" 

309 ) 

310 self.__isStopped = True 

311 

312 def __setupSocketParser(self): 

313 """Set up the argparse context for unix socket commands""" 

314 self.__parser = SocketArgumentParser( 

315 prog=f"{self.__config['appname']}-cli", 

316 description=f"Hermes client {self.__config['appname']} CLI", 

317 exit_on_error=False, 

318 ) 

319 

320 subparsers = self.__parser.add_subparsers(help="Sub-commands") 

321 

322 # Quit 

323 sp_quit = subparsers.add_parser("quit", help=f"Stop {self.__config['appname']}") 

324 sp_quit.set_defaults(func=self.__sock_quit) 

325 

326 # Pause 

327 sp_pause = subparsers.add_parser( 

328 "pause", help="Pause processing until 'resume' command is sent" 

329 ) 

330 sp_pause.set_defaults(func=self.__sock_pause) 

331 

332 # Resume 

333 sp_resume = subparsers.add_parser( 

334 "resume", help="Resume processing that has been paused with 'pause'" 

335 ) 

336 sp_resume.set_defaults(func=self.__sock_resume) 

337 

338 # Status 

339 sp_status = subparsers.add_parser( 

340 "status", help=f"Show {self.__config['appname']} status" 

341 ) 

342 sp_status.set_defaults(func=self.__sock_status) 

343 sp_status.add_argument( 

344 "-j", 

345 "--json", 

346 action="store_const", 

347 const=True, 

348 default=False, 

349 help="Print status as json", 

350 ) 

351 sp_status.add_argument( 

352 "-v", 

353 "--verbose", 

354 action="store_const", 

355 const=True, 

356 default=False, 

357 help="Output items without values", 

358 ) 

359 

360 def __processSocketMessage( 

361 self, msg: SocketMessageToServer 

362 ) -> SocketMessageToClient: 

363 """Handler that process specified msg received on unix socket and returns the 

364 answer to send""" 

365 reply: SocketMessageToClient | None = None 

366 

367 try: 

368 args = self.__parser.parse_args(msg.argv) 

369 if "func" not in args: 

370 raise SocketParsingMessage(self.__parser.format_help()) 

371 except (SocketParsingError, SocketParsingMessage) as e: 

372 retmsg = str(e) 

373 except argparse.ArgumentError as e: 

374 retmsg = self.__parser.format_error(str(e)) 

375 else: 

376 try: 

377 reply = args.func(args) 

378 except Exception as e: 

379 lines = traceback.format_exception(type(e), e, e.__traceback__) 

380 trace = "".join(lines).strip() 

381 __hermes__.logger.critical(f"Unhandled exception: {trace}") 

382 retmsg = trace 

383 

384 if reply is None: # Error was met 

385 reply = SocketMessageToClient(retcode=1, retmsg=retmsg) 

386 

387 return reply 

388 

389 def __sock_quit(self, args: argparse.Namespace) -> SocketMessageToClient: 

390 """Handler called when quit subcommand is requested on unix socket""" 

391 self.__isStopped = True 

392 __hermes__.logger.info(f"{self.__config['appname']} has been requested to quit") 

393 return SocketMessageToClient(retcode=0, retmsg="") 

394 

395 def __sock_pause(self, args: argparse.Namespace) -> SocketMessageToClient: 

396 """Handler called when pause subcommand is requested on unix socket""" 

397 if self.__isStopped: 

398 return SocketMessageToClient( 

399 retcode=1, 

400 retmsg=f"Error: {self.__config['appname']} is currently being stopped", 

401 ) 

402 

403 if self.__isPaused: 

404 return SocketMessageToClient( 

405 retcode=1, 

406 retmsg=f"Error: {self.__config['appname']} is already paused", 

407 ) 

408 

409 __hermes__.logger.info( 

410 f"{self.__config['appname']} has been requested to pause" 

411 ) 

412 self.__isPaused = datetime.now() 

413 return SocketMessageToClient(retcode=0, retmsg="") 

414 

415 def __sock_resume(self, args: argparse.Namespace) -> SocketMessageToClient: 

416 """Handler called when resume subcommand is requested on unix socket""" 

417 if self.__isStopped: 

418 return SocketMessageToClient( 

419 retcode=1, 

420 retmsg=f"Error: {self.__config['appname']} is currently being stopped", 

421 ) 

422 

423 if not self.__isPaused: 

424 return SocketMessageToClient( 

425 retcode=1, retmsg=f"Error: {self.__config['appname']} is not paused" 

426 ) 

427 

428 __hermes__.logger.info( 

429 f"{self.__config['appname']} has been requested to resume" 

430 ) 

431 self.__isPaused = None 

432 return SocketMessageToClient(retcode=0, retmsg="") 

433 

434 def __sock_status(self, args: argparse.Namespace) -> SocketMessageToClient: 

435 """Handler called when status subcommand is requested on unix socket""" 

436 status = self.__status(verbose=args.verbose) 

437 if args.json: 

438 msg = json.dumps(status, cls=JSONEncoder, indent=4) 

439 else: 

440 nl = "\n" 

441 info2printable = {} 

442 msg = "" 

443 for objname in [self.__config["appname"]] + list( 

444 status.keys() - (self.__config["appname"],) 

445 ): 

446 infos = status[objname] 

447 msg += f"{objname}:{nl}" 

448 for category in ("information", "warning", "error"): 

449 if category not in infos: 

450 continue 

451 if not infos[category]: 

452 msg += f" * {category.capitalize()}: []{nl}" 

453 continue 

454 

455 msg += f" * {category.capitalize()}{nl}" 

456 for infoname, infodata in infos[category].items(): 

457 indentedinfodata = str(infodata).replace("\n", "\n ") 

458 msg += ( 

459 f" - {info2printable.get(infoname, infoname)}:" 

460 f" {indentedinfodata}{nl}" 

461 ) 

462 msg = msg.rstrip() 

463 

464 return SocketMessageToClient(retcode=0, retmsg=msg) 

465 

466 @property 

467 def currentStep(self) -> int: 

468 """Step number of current event processed. 

469 Allow clients to resume an event where it has failed""" 

470 return self.__currentStep 

471 

472 @currentStep.setter 

473 def currentStep(self, value: int): 

474 if type(value) is not int: 

475 raise TypeError( 

476 f"Specified step {value=} has invalid type '{type(value)}'" 

477 " instead of int" 

478 ) 

479 

480 if value < 0: 

481 raise ValueError(f"Specified step {value=} must be greater or equal to 0") 

482 

483 self.__currentStep = value 

484 

485 @property 

486 def isPartiallyProcessed(self) -> bool: 

487 """Indicate if some data has been processed during current event processing. 

488 Required to handle autoremediation properly""" 

489 return self.__isPartiallyProcessed 

490 

491 @isPartiallyProcessed.setter 

492 def isPartiallyProcessed(self, value: bool): 

493 if type(value) is not bool: 

494 raise TypeError( 

495 f"Specified isPartiallyProcessed {value=} has invalid type" 

496 f" '{type(value)}' instead of bool" 

497 ) 

498 

499 self.__isPartiallyProcessed = value 

500 

501 @property 

502 def isAnErrorRetry(self) -> bool: 

503 """Read-only attribute that indicates to handler whether the current event is 

504 being processed as part of an error retry""" 

505 return self.__isAnErrorRetry 

506 

507 def mainLoop(self): 

508 """Client main loop""" 

509 self.__startTime = datetime.now() 

510 

511 self.__checkDatamodelWarnings() 

512 # TODO: implement a check to ensure subclasses required data types and 

513 # attributes exist in datamodel 

514 

515 if self.__datamodel.hasRemoteSchema(): 

516 __hermes__.logger.debug( 

517 "Remote Dataschema in cache:" 

518 f" {self.__datamodel.remote_schema.to_json()=}" 

519 ) 

520 self.__datamodel.loadErrorQueue() 

521 else: 

522 __hermes__.logger.debug("No remote Dataschema in cache yet") 

523 

524 if self.__sock is not None: 

525 self.__sock.startProcessMessagesDaemon(appname=__hermes__.appname) 

526 

527 # Reduce sleep duration during functional tests to speed them up 

528 sleepDuration = 1 if self.__numberOfLoopToProcess is None else 0.05 

529 

530 isFirstLoopIteration: bool = True 

531 while not self.__isStopped: 

532 self.__saveRequired = False 

533 

534 try: 

535 with self.__msgbus: 

536 if self.__isPaused or self.__numberOfLoopToProcess == 0: 

537 sleep(sleepDuration) 

538 continue 

539 

540 try: 

541 if self.__hasAlreadyBeenInitialized(): 

542 if isFirstLoopIteration: 

543 try: 

544 self.__processDatamodelUpdate() 

545 except Exception as e: 

546 self.__notifyFatalException( 

547 HermesClientHandlerError.exceptionToString( 

548 e, purgeCurrentFileFromTrace=False 

549 ) 

550 ) 

551 raise HermesAlreadyNotifiedException 

552 self.__retryErrorQueue() 

553 self.__emptyTrashBin() 

554 self.__processEvents(isInitSync=False) 

555 else: 

556 __hermes__.logger.info( 

557 "Client hasn't ran its first initsync sequence yet" 

558 ) 

559 if self.__canBeInitialized(): 

560 __hermes__.logger.info( 

561 "First initsync sequence processing begins" 

562 ) 

563 self.__processEvents(isInitSync=True) 

564 if self.__hasAlreadyBeenInitialized(): 

565 __hermes__.logger.info( 

566 "First initsync sequence processing completed" 

567 ) 

568 else: 

569 __hermes__.logger.info( 

570 "No initsync sequence is available on message bus." 

571 " Retry..." 

572 ) 

573 

574 self.__notifyException(None) 

575 

576 except HermesAlreadyNotifiedException: 

577 pass 

578 except InvalidDataError as e: 

579 self.__notifyFatalException( 

580 HermesClientHandlerError.exceptionToString( 

581 e, purgeCurrentFileFromTrace=False 

582 ) 

583 ) 

584 except Exception as e: 

585 self.__notifyException( 

586 HermesClientHandlerError.exceptionToString( 

587 e, purgeCurrentFileFromTrace=False 

588 ) 

589 ) 

590 finally: 

591 isFirstLoopIteration = False 

592 # Still could be True if an exception was raised in 

593 # __retryErrorQueue() 

594 self.__isAnErrorRetry = False 

595 except Exception as e: 

596 __hermes__.logger.warning( 

597 "Message bus seems to be unavailable." 

598 " Waiting 60 seconds before retrying" 

599 ) 

600 self.__notifyException( 

601 HermesClientHandlerError.exceptionToString( 

602 e, purgeCurrentFileFromTrace=False 

603 ) 

604 ) 

605 # Wait one second 60 times to avoid waiting too long before stopping 

606 for i in range(60): 

607 if self.__isStopped: 

608 break 

609 sleep(1) 

610 finally: 

611 if self.__saveRequired or self.__isStopped: 

612 self.__datamodel.saveErrorQueue() 

613 if self.__hasAtLeastBeganInitialization(): 

614 self.__datamodel.saveLocalAndRemoteData() 

615 # Reset current step for "on_save" event 

616 self.currentStep = 0 

617 self.isPartiallyProcessed = False 

618 # Call special event "on_save()" 

619 self.__callHandler("", "save") 

620 self.__notifyQueueErrors() 

621 self.__cache.savecachefile() 

622 

623 if self.__isStopped: 

624 # Only to ensure cache files version update is saved, 

625 # to avoid version migrations at each restart, 

626 # as those files aren't expected to be updated often 

627 self.__config.savecachefile() 

628 self.__datamodel.remote_schema.savecachefile() 

629 

630 # Only used in functionnal tests 

631 if self.__numberOfLoopToProcess: 

632 self.__numberOfLoopToProcess -= 1 

633 

634 def __retryErrorQueue(self): 

635 # Enforce retryInterval 

636 now = datetime.now() 

637 if now < self.__errorQueue_lastretry + self.__errorQueue_retryInterval: 

638 return # Too early to process again 

639 

640 # All events processed in __retryErrorQueue() require this attribute to be True 

641 self.__isAnErrorRetry = True 

642 

643 done = False 

644 evNumbersToRetry: list[int] = [] 

645 eventNumber: int 

646 localEvent: Event 

647 remoteEvent: Event | None 

648 while not done: 

649 retryQueue: list[int] = [] 

650 previousKeys = self.__datamodel.errorqueue.keys() 

651 

652 for ( 

653 eventNumber, 

654 remoteEvent, 

655 localEvent, 

656 errorMsg, 

657 ) in self.__datamodel.errorqueue: 

658 if evNumbersToRetry and eventNumber not in evNumbersToRetry: 

659 # Ignore eventNumber absent from evNumbersToRetry, 

660 # excepted on first iteration of while loop 

661 continue 

662 

663 if remoteEvent is not None: 

664 if self.__datamodel.errorqueue.isEventAParentOfAnotherError( 

665 remoteEvent, False 

666 ): 

667 __hermes__.logger.info( 

668 f"Won't retry remote event {remoteEvent} from error queue" 

669 " as it is still a dependency of another error" 

670 ) 

671 retryQueue.append(eventNumber) 

672 continue 

673 

674 __hermes__.logger.info( 

675 f"Retrying to process remote event {remoteEvent} from error" 

676 " queue" 

677 ) 

678 try: 

679 self.__processRemoteEvent( 

680 remoteEvent, localEvent, enqueueEventWithError=False 

681 ) 

682 except HermesClientHandlerError as e: 

683 __hermes__.logger.info( 

684 f"... failed on step {self.currentStep}: {str(e)}" 

685 ) 

686 remoteEvent.step = self.currentStep 

687 remoteEvent.isPartiallyProcessed = self.isPartiallyProcessed 

688 localEvent.step = self.currentStep 

689 localEvent.isPartiallyProcessed = self.isPartiallyProcessed 

690 self.__datamodel.errorqueue.updateErrorMsg(eventNumber, e.msg) 

691 else: 

692 # If event has suppressed object, eventNumber has already been 

693 # purged from queue 

694 self.__datamodel.errorqueue.remove( 

695 eventNumber, ignoreMissingEventNumber=True 

696 ) 

697 else: 

698 if self.__datamodel.errorqueue.isEventAParentOfAnotherError( 

699 localEvent, True 

700 ): 

701 __hermes__.logger.info( 

702 f"Won't retry local event {localEvent} from error queue" 

703 " as it is still a dependency of another error" 

704 ) 

705 retryQueue.append(eventNumber) 

706 continue 

707 __hermes__.logger.info( 

708 f"Retrying to process local event {localEvent} from error queue" 

709 ) 

710 try: 

711 self.__processLocalEvent( 

712 remoteEvent, localEvent, enqueueEventWithError=False 

713 ) 

714 except HermesClientHandlerError as e: 

715 __hermes__.logger.info( 

716 f"... failed on step {self.currentStep}: {str(e)}" 

717 ) 

718 localEvent.step = self.currentStep 

719 localEvent.isPartiallyProcessed = self.isPartiallyProcessed 

720 self.__datamodel.errorqueue.updateErrorMsg(eventNumber, e.msg) 

721 else: 

722 # If event has suppressed object, eventNumber has already been 

723 # purged from queue 

724 self.__datamodel.errorqueue.remove( 

725 eventNumber, ignoreMissingEventNumber=True 

726 ) 

727 while self.__isPaused and not self.__isStopped: 

728 sleep(1) # Allow loop to be paused 

729 if self.__isStopped: 

730 break # Allow loop to be interrupted if requested 

731 else: 

732 # Update __errorQueue_lastretry only if loop hasn't been interrupted 

733 self.__errorQueue_lastretry = now 

734 

735 done = previousKeys == self.__datamodel.errorqueue.keys() or not retryQueue 

736 if done: 

737 if previousKeys: 

738 __hermes__.logger.debug( 

739 f"End of retryerrorqueue {previousKeys=}" 

740 f" {self.__datamodel.errorqueue.keys()=} - {retryQueue=}" 

741 ) 

742 else: 

743 __hermes__.logger.debug( 

744 "As some event have been processed, will retry ignored events" 

745 f" {retryQueue}" 

746 ) 

747 evNumbersToRetry = retryQueue.copy() 

748 

749 self.__isAnErrorRetry = False # End of __retryErrorQueue() 

750 

751 def __emptyTrashBin(self, force: bool = False): 

752 # Enforce purgeInterval 

753 now = datetime.now() 

754 if ( 

755 not force 

756 and now < self.__trashbin_lastpurge + self.__trashbin_purgeInterval 

757 ): 

758 return # Too early to process again 

759 if self.__trashbin_retention is not None: 

760 retentionLimit = datetime.now() - self.__trashbin_retention 

761 

762 objtype: str 

763 objs: DataObjectList 

764 # As we'll remove objects, process data in the datamodel reversed declaration 

765 # order 

766 for objtype, objs in reversed(self.__datamodel.remotedata.items()): 

767 if not objtype.startswith("trashbin_"): 

768 continue 

769 

770 for pkey in objs.getPKeys(): 

771 obj = objs.get(pkey) 

772 if ( 

773 self.__trashbin_retention is None 

774 or obj._trashbin_timestamp < retentionLimit 

775 ): 

776 event = Event( 

777 evcategory="base", eventtype="removed", obj=obj, objattrs={} 

778 ) 

779 __hermes__.logger.info(f"Trying to purge {repr(obj)} from trashbin") 

780 if self.__datamodel.errorqueue.containsObjectByEvent( 

781 event, isLocalEvent=False 

782 ) and not self.__datamodel.errorqueue.isEventAParentOfAnotherError( 

783 event, isLocalEvent=False 

784 ): 

785 try: 

786 self.__processRemoteEvent( 

787 event, local_event=None, enqueueEventWithError=False 

788 ) 

789 except HermesClientHandlerError: 

790 pass 

791 else: 

792 self.__processRemoteEvent( 

793 event, local_event=None, enqueueEventWithError=True 

794 ) 

795 

796 while self.__isPaused and not self.__isStopped: 

797 sleep(1) # Allow loop to be paused 

798 if self.__isStopped: 

799 break # Allow loop to be interrupted if requested 

800 

801 while self.__isPaused and not self.__isStopped: 

802 sleep(1) # Allow loop to be paused 

803 if self.__isStopped: 

804 break # Allow loop to be interrupted if requested 

805 else: 

806 # Update __trashbin_lastpurge only if loop hasn't been interrupted 

807 self.__trashbin_lastpurge = now 

808 

809 def __hasAlreadyBeenInitialized(self) -> bool: 

810 if ( 

811 self.__cache.initstartoffset is None 

812 or self.__cache.initstopoffset is None 

813 or self.__cache.nextoffset is None 

814 or self.__cache.nextoffset < self.__cache.initstopoffset 

815 ): 

816 return False 

817 return True 

818 

819 def __hasAtLeastBeganInitialization(self) -> bool: 

820 return ( 

821 self.__cache.initstartoffset is not None 

822 and self.__datamodel.hasRemoteSchema() 

823 ) 

824 

825 def __canBeInitialized(self) -> bool: 

826 self.__msgbus.seekToBeginning() 

827 

828 # List of (start, stop) offsets of initsync sequences found 

829 initSyncFound: list[tuple[Any, Any]] = [] 

830 

831 start = None 

832 stop = None 

833 event: Event 

834 for event in self.__msgbus: 

835 if event.evcategory != "initsync": 

836 continue 

837 if event.eventtype == "init-start": 

838 start = event.offset 

839 elif event.eventtype == "init-stop" and start is not None: 

840 stop = event.offset 

841 initSyncFound.append( 

842 (start, stop), 

843 ) 

844 if self.__useFirstInitsyncSequence: 

845 break # We found the first sequence 

846 else: 

847 # Continue to find a new complete sequence 

848 start = None 

849 stop = None 

850 

851 if not initSyncFound: 

852 return False 

853 

854 if self.__useFirstInitsyncSequence: 

855 start, stop = initSyncFound[0] 

856 else: 

857 start, stop = initSyncFound[-1] 

858 

859 if self.__cache.nextoffset is None or self.__cache.nextoffset < start: 

860 self.__cache.nextoffset = start 

861 

862 self.__cache.initstartoffset = start 

863 self.__cache.initstopoffset = stop 

864 __hermes__.logger.debug( 

865 "Init sequence was found in Kafka at offsets" 

866 f" [{self.__cache.initstartoffset} ; {self.__cache.initstopoffset}]" 

867 ) 

868 return True 

869 

870 def __updateSchema(self, newSchema: Dataschema): 

871 if self.__datamodel.forcePurgeOfTrashedObjectsWithoutNewPkeys( 

872 self.__datamodel.remote_schema, newSchema 

873 ): 

874 self.__emptyTrashBin(force=True) 

875 

876 self.__datamodel.updateSchema(newSchema) 

877 self.__config.savecachefile() # Save config to be able to rebuild datamodel 

878 # Save and reload error queue to purge it from events of any suppressed types 

879 self.__datamodel.saveErrorQueue() 

880 self.__datamodel.loadErrorQueue() 

881 self.__checkDatamodelWarnings() 

882 

883 def __checkDatamodelWarnings(self): 

884 if self.__datamodel.unknownRemoteTypes: 

885 __hermes__.logger.warning( 

886 "Datamodel errors: remote types" 

887 f" '{self.__datamodel.unknownRemoteTypes}'" 

888 " don't exist in current Dataschema" 

889 ) 

890 

891 if self.__datamodel.unknownRemoteAttributes: 

892 __hermes__.logger.warning( 

893 "Datamodel errors: remote attributes don't exist in current" 

894 f" Dataschema: {self.__datamodel.unknownRemoteAttributes}" 

895 ) 

896 self.__notifyDatamodelWarnings() 

897 

898 def __processEvents(self, isInitSync=False): 

899 remote_event: Event 

900 schema: Dataschema | None = None 

901 evcategory: str = "initsync" if isInitSync else "base" 

902 

903 self.__msgbus.seek(self.__cache.nextoffset) 

904 

905 for remote_event in self.__msgbus: 

906 self.__saveRequired = True 

907 if isInitSync and remote_event.offset > self.__cache.initstopoffset: 

908 # Should never be called 

909 self.__cache.nextoffset = remote_event.offset + 1 

910 break 

911 

912 # TODO: implement data consistency check if event.evcategory==initsync 

913 # and evcategory==base 

914 

915 if remote_event.evcategory != evcategory: 

916 self.__cache.nextoffset = remote_event.offset + 1 

917 continue 

918 

919 if isInitSync: 

920 if remote_event.eventtype == "init-start": 

921 schema = Dataschema.from_json(remote_event.objattrs) 

922 self.__updateSchema(schema) 

923 continue 

924 

925 if remote_event.eventtype == "init-stop": 

926 self.__cache.nextoffset = remote_event.offset + 1 

927 break 

928 

929 if schema is None and not self.__hasAtLeastBeganInitialization(): 

930 msg = "Invalid initsync sequence met, ignoring" 

931 __hermes__.logger.critical(msg) 

932 return 

933 

934 # Process "standard" message 

935 match remote_event.eventtype: 

936 case "added" | "modified" | "removed": 

937 self.__processRemoteEvent( 

938 remote_event, local_event=None, enqueueEventWithError=True 

939 ) 

940 case "dataschema": 

941 schema = Dataschema.from_json(remote_event.objattrs) 

942 self.__updateSchema(schema) 

943 case _: 

944 __hermes__.logger.error( 

945 "Received an event with unknown type" 

946 f" '{remote_event.eventtype}': ignored" 

947 ) 

948 

949 self.__cache.nextoffset = remote_event.offset + 1 

950 

951 while self.__isPaused and not self.__isStopped: 

952 sleep(1) # Allow loop to be paused 

953 if self.__isStopped: 

954 break # Allow loop to be interrupted if requested 

955 

956 def __processRemoteEvent( 

957 self, 

958 remote_event: Event | None, 

959 local_event: Event | None, 

960 enqueueEventWithError: bool, 

961 simulateOnly: bool = False, 

962 ): 

963 secretAttrs = self.__datamodel.remote_schema.secretsAttributesOf( 

964 remote_event.objtype 

965 ) 

966 __hermes__.logger.debug( 

967 f"__processRemoteEvent({remote_event.toString(secretAttrs)})" 

968 ) 

969 self.__saveRequired = True 

970 

971 # In case of modification, try to compose full object in order to provide all 

972 # attributes values, in order to render Jinja Template with several vars. 

973 # In this specific case, one of the template var may have been modified, 

974 # but not the other, so the event attrs are not enough to process the 

975 # template rendering 

976 r_obj_complete: DataObject | None = None 

977 if remote_event.eventtype == "modified": 

978 cache_complete, r_cachedobj_complete = ( 

979 Datamodel.getObjectFromCacheOrTrashbin( 

980 self.__datamodel.remotedata_complete, 

981 remote_event.objtype, 

982 remote_event.objpkey, 

983 ) 

984 ) 

985 if r_cachedobj_complete is not None: 

986 r_obj_complete = Datamodel.getUpdatedObject( 

987 r_cachedobj_complete, remote_event.objattrs 

988 ) 

989 

990 # Should be always None, except when called from __retryErrorQueue() 

991 # In this case, we have to use the provided local_event, as it may contains 

992 # some extra changes stacked by autoremediation 

993 if local_event is None: 

994 local_event = self.__datamodel.convertEventToLocal( 

995 remote_event, r_obj_complete 

996 ) 

997 trashbin = self.__datamodel.remotedata[f"trashbin_{remote_event.objtype}"] 

998 

999 if not simulateOnly and enqueueEventWithError: 

1000 hadErrors = self.__datamodel.errorqueue.containsObjectByEvent( 

1001 remote_event, isLocalEvent=False 

1002 ) 

1003 isParent = self.__datamodel.errorqueue.isEventAParentOfAnotherError( 

1004 remote_event, isLocalEvent=False 

1005 ) 

1006 if hadErrors or ( 

1007 isParent and remote_event.eventtype in self.__foreignkeys_events 

1008 ): 

1009 secretAttrs = self.__datamodel.remote_schema.secretsAttributesOf( 

1010 remote_event.objtype 

1011 ) 

1012 if hadErrors: 

1013 errorMsg = ( 

1014 f"Object in remote event {remote_event.toString(secretAttrs)}" 

1015 " already had unresolved errors: appending event to error queue" 

1016 ) 

1017 else: 

1018 errorMsg = ( 

1019 f"Object in remote event {remote_event.toString(secretAttrs)}" 

1020 " is a dependency of an object that already had unresolved" 

1021 " errors: appending event to error queue" 

1022 ) 

1023 __hermes__.logger.warning(errorMsg) 

1024 self.__processRemoteEvent( 

1025 remote_event, 

1026 local_event=None, 

1027 enqueueEventWithError=False, 

1028 simulateOnly=True, 

1029 ) 

1030 if local_event is None: 

1031 # Force empty event generation when local_event doesn't change 

1032 # anything 

1033 local_event = self.__datamodel.convertEventToLocal( 

1034 remote_event, r_obj_complete, allowEmptyEvent=True 

1035 ) 

1036 self.__datamodel.errorqueue.append(remote_event, local_event, errorMsg) 

1037 return 

1038 

1039 try: 

1040 match remote_event.eventtype: 

1041 case "added": 

1042 if ( 

1043 self.__trashbin_retention is not None 

1044 and remote_event.objpkey in trashbin 

1045 ): 

1046 # Object is in trashbin, recycle it 

1047 self.__remoteRecycled(remote_event, local_event, simulateOnly) 

1048 else: 

1049 # Add new object 

1050 self.__remoteAdded(remote_event, local_event, simulateOnly) 

1051 

1052 case "modified": 

1053 self.__remoteModified(remote_event, local_event, simulateOnly) 

1054 

1055 case "removed": 

1056 # Remove object on any of these conditions: 

1057 # - trashbin retention is disabled 

1058 # - object is already in trashbin 

1059 if ( 

1060 self.__trashbin_retention is None 

1061 or remote_event.objpkey in trashbin 

1062 ): 

1063 # Remove object 

1064 self.__remoteRemoved(remote_event, local_event, simulateOnly) 

1065 else: 

1066 # Store object in trashbin 

1067 self.__remoteTrashed(remote_event, local_event, simulateOnly) 

1068 except HermesClientHandlerError as e: 

1069 if not simulateOnly and enqueueEventWithError: 

1070 self.__processRemoteEvent( 

1071 remote_event, 

1072 local_event=None, 

1073 enqueueEventWithError=False, 

1074 simulateOnly=True, 

1075 ) 

1076 remote_event.step = self.currentStep 

1077 remote_event.isPartiallyProcessed = self.isPartiallyProcessed 

1078 local_event.step = self.currentStep 

1079 local_event.isPartiallyProcessed = self.isPartiallyProcessed 

1080 

1081 if local_event is None: 

1082 # Force empty event generation when local_event doesn't change 

1083 # anything 

1084 local_event = self.__datamodel.convertEventToLocal( 

1085 remote_event, r_obj_complete, allowEmptyEvent=True 

1086 ) 

1087 self.__datamodel.errorqueue.append(remote_event, local_event, e.msg) 

1088 else: 

1089 raise 

1090 

1091 def __processLocalEvent( 

1092 self, 

1093 remote_event: Event | None, 

1094 local_event: Event | None, 

1095 enqueueEventWithError: bool, 

1096 simulateOnly: bool = False, 

1097 ): 

1098 if local_event is None: 

1099 __hermes__.logger.debug("__processLocalEvent(None)") 

1100 return 

1101 

1102 secretAttrs = self.__datamodel.local_schema.secretsAttributesOf( 

1103 local_event.objtype 

1104 ) 

1105 __hermes__.logger.debug( 

1106 f"__processLocalEvent({local_event.toString(secretAttrs)})" 

1107 ) 

1108 

1109 self.__saveRequired = True 

1110 

1111 if not simulateOnly: 

1112 # Reset current step 

1113 self.currentStep = local_event.step 

1114 self.isPartiallyProcessed = local_event.isPartiallyProcessed 

1115 

1116 if not simulateOnly and enqueueEventWithError: 

1117 hadErrors = self.__datamodel.errorqueue.containsObjectByEvent( 

1118 local_event, isLocalEvent=True 

1119 ) 

1120 isParent = self.__datamodel.errorqueue.isEventAParentOfAnotherError( 

1121 local_event, isLocalEvent=True 

1122 ) 

1123 if hadErrors or ( 

1124 isParent and local_event.eventtype in self.__foreignkeys_events 

1125 ): 

1126 secretAttrs = self.__datamodel.local_schema.secretsAttributesOf( 

1127 local_event.objtype 

1128 ) 

1129 if hadErrors: 

1130 errorMsg = ( 

1131 f"Object in local event {local_event.toString(secretAttrs)}" 

1132 " already had unresolved errors: appending event to error queue" 

1133 ) 

1134 else: 

1135 errorMsg = ( 

1136 f"Object in local event {local_event.toString(secretAttrs)}" 

1137 " is a dependency of an object that already had unresolved" 

1138 " errors: appending event to error queue" 

1139 ) 

1140 __hermes__.logger.warning(errorMsg) 

1141 self.__processLocalEvent( 

1142 None, local_event, enqueueEventWithError=False, simulateOnly=True 

1143 ) 

1144 self.__datamodel.errorqueue.append(remote_event, local_event, errorMsg) 

1145 return 

1146 

1147 trashbin = self.__datamodel.localdata[f"trashbin_{local_event.objtype}"] 

1148 try: 

1149 match local_event.eventtype: 

1150 case "added": 

1151 if ( 

1152 self.__trashbin_retention is not None 

1153 and local_event.objpkey in trashbin 

1154 ): 

1155 # Object is in trashbin, recycle it 

1156 self.__localRecycled(local_event, simulateOnly) 

1157 else: 

1158 self.__localAdded(local_event, simulateOnly) 

1159 case "modified": 

1160 if not simulateOnly and local_event.objpkey in trashbin: 

1161 # Object is in trashbin, and cannot be modified until it is 

1162 # restored 

1163 if enqueueEventWithError: 

1164 # As the object changes will be processed at restore, 

1165 # ignore the change 

1166 self.__processLocalEvent( 

1167 None, 

1168 local_event, 

1169 enqueueEventWithError=False, 

1170 simulateOnly=True, 

1171 ) 

1172 else: 

1173 # Propagate error as requested 

1174 raise HermesClientHandlerError( 

1175 f"Object of event {repr(local_event)} is in trashbin," 

1176 " and cannot be modified until it is restored" 

1177 ) 

1178 else: 

1179 self.__localModified(local_event, simulateOnly) 

1180 case "removed": 

1181 # Remove object on any of these conditions: 

1182 # - trashbin retention is disabled 

1183 # - object is already in trashbin 

1184 if ( 

1185 self.__trashbin_retention is None 

1186 or local_event.objpkey in trashbin 

1187 ): 

1188 self.__localRemoved(local_event, simulateOnly) 

1189 else: 

1190 self.__localTrashed(local_event, simulateOnly) 

1191 except HermesClientHandlerError as e: 

1192 if not simulateOnly and enqueueEventWithError: 

1193 self.__processLocalEvent( 

1194 None, local_event, enqueueEventWithError=False, simulateOnly=True 

1195 ) 

1196 if remote_event is not None: 

1197 remote_event.step = self.currentStep 

1198 remote_event.isPartiallyProcessed = self.isPartiallyProcessed 

1199 local_event.step = self.currentStep 

1200 local_event.isPartiallyProcessed = self.isPartiallyProcessed 

1201 self.__datamodel.errorqueue.append(remote_event, local_event, e.msg) 

1202 else: 

1203 raise 

1204 

1205 def __remoteAdded( 

1206 self, remote_event: Event, local_event: Event, simulateOnly: bool = False 

1207 ): 

1208 secretAttrs = self.__datamodel.remote_schema.secretsAttributesOf( 

1209 remote_event.objtype 

1210 ) 

1211 __hermes__.logger.debug(f"__remoteAdded({remote_event.toString(secretAttrs)})") 

1212 

1213 r_obj = self.__datamodel.createRemoteDataobject( 

1214 remote_event.objtype, remote_event.objattrs 

1215 ) 

1216 self.__processLocalEvent( 

1217 remote_event, 

1218 local_event, 

1219 enqueueEventWithError=False, 

1220 simulateOnly=simulateOnly, 

1221 ) 

1222 

1223 # Add remote object to cache 

1224 if not simulateOnly: 

1225 self.__datamodel.remotedata[remote_event.objtype].append(r_obj) 

1226 # May already been added if current event is from errorqueue 

1227 if r_obj not in self.__datamodel.remotedata_complete[remote_event.objtype]: 

1228 self.__datamodel.remotedata_complete[remote_event.objtype].append(r_obj) 

1229 

1230 def __localAdded(self, local_ev: Event, simulateOnly: bool = False): 

1231 secretAttrs = self.__datamodel.local_schema.secretsAttributesOf( 

1232 local_ev.objtype 

1233 ) 

1234 __hermes__.logger.debug(f"__localAdded({local_ev.toString(secretAttrs)})") 

1235 

1236 l_obj = self.__datamodel.createLocalDataobject( 

1237 local_ev.objtype, local_ev.objattrs 

1238 ) 

1239 

1240 if not simulateOnly: 

1241 # Call added handler 

1242 self.__callHandler( 

1243 objtype=local_ev.objtype, 

1244 eventtype="added", 

1245 objkey=local_ev.objpkey, 

1246 eventattrs=local_ev.objattrs, 

1247 newobj=deepcopy(l_obj), 

1248 ) 

1249 

1250 # Add local object to cache 

1251 if not simulateOnly: 

1252 self.__datamodel.localdata[local_ev.objtype].append(l_obj) 

1253 # May already been added if current event is from errorqueue 

1254 if l_obj not in self.__datamodel.localdata_complete[local_ev.objtype]: 

1255 self.__datamodel.localdata_complete[local_ev.objtype].append(l_obj) 

1256 

1257 def __remoteRecycled( 

1258 self, remote_event: Event, local_event: Event, simulateOnly: bool = False 

1259 ): 

1260 secretAttrs = self.__datamodel.remote_schema.secretsAttributesOf( 

1261 remote_event.objtype 

1262 ) 

1263 __hermes__.logger.debug( 

1264 f"__remoteRecycled({remote_event.toString(secretAttrs)})" 

1265 ) 

1266 maincache = self.__datamodel.remotedata[remote_event.objtype] 

1267 maincache_complete = self.__datamodel.remotedata_complete[remote_event.objtype] 

1268 trashbin = self.__datamodel.remotedata[f"trashbin_{remote_event.objtype}"] 

1269 trashbin_complete = self.__datamodel.remotedata_complete[ 

1270 f"trashbin_{remote_event.objtype}" 

1271 ] 

1272 

1273 r_obj = self.__datamodel.createRemoteDataobject( 

1274 remote_event.objtype, remote_event.objattrs 

1275 ) 

1276 

1277 self.__processLocalEvent( 

1278 remote_event, 

1279 local_event, 

1280 enqueueEventWithError=False, 

1281 simulateOnly=simulateOnly, 

1282 ) 

1283 

1284 # Remove remote object from trashbin 

1285 if not simulateOnly: 

1286 trashbin.removeByPkey(remote_event.objpkey) 

1287 trashbin_complete.removeByPkey(remote_event.objpkey) 

1288 # Restore remote object, with its potential changes, in main cache 

1289 if not simulateOnly: 

1290 maincache.append(r_obj) 

1291 # May already been recycled if current event is from errorqueue 

1292 if r_obj not in maincache_complete: 

1293 maincache_complete.append(r_obj) 

1294 

1295 def __localRecycled(self, local_ev: Event, simulateOnly: bool = False): 

1296 secretAttrs = self.__datamodel.local_schema.secretsAttributesOf( 

1297 local_ev.objtype 

1298 ) 

1299 __hermes__.logger.debug(f"__localRecycled({local_ev.toString(secretAttrs)})") 

1300 maincache = self.__datamodel.localdata[local_ev.objtype] 

1301 maincache_complete = self.__datamodel.localdata_complete[local_ev.objtype] 

1302 trashbin = self.__datamodel.localdata[f"trashbin_{local_ev.objtype}"] 

1303 trashbin_complete = self.__datamodel.localdata_complete[ 

1304 f"trashbin_{local_ev.objtype}" 

1305 ] 

1306 

1307 l_obj = self.__datamodel.createLocalDataobject( 

1308 local_ev.objtype, local_ev.objattrs 

1309 ) 

1310 l_obj_trash: DataObject = deepcopy(trashbin.get(local_ev.objpkey)) 

1311 del l_obj_trash._trashbin_timestamp # Remove trashbin timestamp from object 

1312 

1313 l_obj_trash_complete: DataObject = deepcopy( 

1314 trashbin_complete.get(local_ev.objpkey) 

1315 ) 

1316 

1317 # May already been recycled if current event is from errorqueue 

1318 if l_obj_trash_complete is not None: 

1319 del ( 

1320 l_obj_trash_complete._trashbin_timestamp 

1321 ) # Remove trashbin timestamp from object 

1322 

1323 if not simulateOnly: 

1324 # Call recycled handler 

1325 self.__callHandler( 

1326 objtype=local_ev.objtype, 

1327 eventtype="recycled", 

1328 objkey=l_obj_trash.getPKey(), 

1329 eventattrs=l_obj_trash.toNative(), 

1330 newobj=deepcopy(l_obj_trash), 

1331 ) 

1332 

1333 if not simulateOnly: 

1334 # Remove local object from trashbin 

1335 trashbin.remove(l_obj_trash) 

1336 # Restore local object in main cache 

1337 maincache.append(l_obj_trash) 

1338 

1339 # May already been recycled if current event is from errorqueue 

1340 if l_obj_trash_complete is not None: 

1341 trashbin_complete.remove(l_obj_trash_complete) 

1342 maincache_complete.append(l_obj_trash_complete) 

1343 

1344 diff = l_obj.diffFrom(l_obj_trash) # Handle local object changes if any 

1345 if diff and not simulateOnly: 

1346 (event, obj) = Event.fromDiffItem( 

1347 diffitem=diff, 

1348 eventCategory=local_ev.evcategory, 

1349 changeType="modified", 

1350 ) 

1351 # Hack: we pass this second event (modified) to error queue in order to 

1352 # postpone its processing once all caches of previous one are up to date. 

1353 # Otherwise, if an error is met on this second event (modified), we'll try 

1354 # to reprocess the first one (recycled) 

1355 self.__datamodel.errorqueue.append( 

1356 remoteEvent=None, localEvent=event, errorMsg=None 

1357 ) 

1358 # ... and force error queue to be retried asap in order to process the 

1359 # pending event 

1360 self.__errorQueue_lastretry = datetime(year=1, month=1, day=1) 

1361 

1362 def __remoteModified( 

1363 self, remote_event: Event, local_event: Event, simulateOnly: bool = False 

1364 ): 

1365 secretAttrs = self.__datamodel.remote_schema.secretsAttributesOf( 

1366 remote_event.objtype 

1367 ) 

1368 __hermes__.logger.debug( 

1369 f"__remoteModified({remote_event.toString(secretAttrs)})" 

1370 ) 

1371 maincache = self.__datamodel.remotedata[remote_event.objtype] 

1372 

1373 if not simulateOnly: 

1374 r_cachedobj: DataObject = maincache.get(remote_event.objpkey) 

1375 r_obj = Datamodel.getUpdatedObject(r_cachedobj, remote_event.objattrs) 

1376 

1377 cache_complete, r_cachedobj_complete = Datamodel.getObjectFromCacheOrTrashbin( 

1378 self.__datamodel.remotedata_complete, 

1379 remote_event.objtype, 

1380 remote_event.objpkey, 

1381 ) 

1382 r_obj_complete = Datamodel.getUpdatedObject( 

1383 r_cachedobj_complete, remote_event.objattrs 

1384 ) 

1385 

1386 self.__processLocalEvent( 

1387 remote_event, 

1388 local_event, 

1389 enqueueEventWithError=False, 

1390 simulateOnly=simulateOnly, 

1391 ) 

1392 

1393 # Update remote object in cache 

1394 if not simulateOnly: 

1395 maincache.replace(r_obj) 

1396 

1397 # May not exist 

1398 if cache_complete is not None: 

1399 cache_complete.replace(r_obj_complete) 

1400 

1401 def __localModified(self, local_ev: Event, simulateOnly: bool = False): 

1402 secretAttrs = self.__datamodel.local_schema.secretsAttributesOf( 

1403 local_ev.objtype 

1404 ) 

1405 __hermes__.logger.debug(f"__localModified({local_ev.toString(secretAttrs)})") 

1406 maincache = self.__datamodel.localdata[local_ev.objtype] 

1407 

1408 if not simulateOnly: 

1409 l_cachedobj: DataObject = maincache.get(local_ev.objpkey) 

1410 l_obj = Datamodel.getUpdatedObject(l_cachedobj, local_ev.objattrs) 

1411 

1412 cache_complete, l_cachedobj_complete = Datamodel.getObjectFromCacheOrTrashbin( 

1413 self.__datamodel.localdata_complete, local_ev.objtype, local_ev.objpkey 

1414 ) 

1415 

1416 # May not exist 

1417 if cache_complete is not None: 

1418 l_obj_complete = Datamodel.getUpdatedObject( 

1419 l_cachedobj_complete, local_ev.objattrs 

1420 ) 

1421 

1422 if not simulateOnly: 

1423 # Call modified handler 

1424 self.__callHandler( 

1425 objtype=local_ev.objtype, 

1426 eventtype="modified", 

1427 objkey=local_ev.objpkey, 

1428 eventattrs=local_ev.objattrs, 

1429 newobj=deepcopy(l_obj), 

1430 cachedobj=deepcopy(l_cachedobj), 

1431 ) 

1432 

1433 # Update local object in cache 

1434 if not simulateOnly: 

1435 maincache.replace(l_obj) 

1436 

1437 # May not exist 

1438 if cache_complete is not None: 

1439 cache_complete.replace(l_obj_complete) 

1440 

1441 def __remoteTrashed( 

1442 self, remote_event: Event, local_event: Event, simulateOnly: bool = False 

1443 ): 

1444 secretAttrs = self.__datamodel.remote_schema.secretsAttributesOf( 

1445 remote_event.objtype 

1446 ) 

1447 __hermes__.logger.debug( 

1448 f"__remoteTrashed({remote_event.toString(secretAttrs)})" 

1449 ) 

1450 maincache = self.__datamodel.remotedata[remote_event.objtype] 

1451 maincache_complete = self.__datamodel.remotedata_complete[remote_event.objtype] 

1452 trashbin = self.__datamodel.remotedata[f"trashbin_{remote_event.objtype}"] 

1453 trashbin_complete = self.__datamodel.remotedata_complete[ 

1454 f"trashbin_{remote_event.objtype}" 

1455 ] 

1456 

1457 r_cachedobj: DataObject = maincache.get(remote_event.objpkey) 

1458 r_cachedobj_complete: DataObject = maincache_complete.get(remote_event.objpkey) 

1459 

1460 self.__processLocalEvent( 

1461 remote_event, 

1462 local_event, 

1463 enqueueEventWithError=False, 

1464 simulateOnly=simulateOnly, 

1465 ) 

1466 

1467 if not simulateOnly: 

1468 # Remove remote object from cache 

1469 maincache.remove(r_cachedobj) 

1470 r_cachedobj._trashbin_timestamp = remote_event.timestamp 

1471 # Add remote object to trashbin 

1472 trashbin.append(r_cachedobj) 

1473 

1474 if r_cachedobj_complete is not None: 

1475 maincache_complete.remove(r_cachedobj_complete) 

1476 r_cachedobj_complete._trashbin_timestamp = remote_event.timestamp 

1477 trashbin_complete.append(r_cachedobj_complete) 

1478 

1479 def __localTrashed(self, local_ev: Event, simulateOnly: bool = False): 

1480 secretAttrs = self.__datamodel.local_schema.secretsAttributesOf( 

1481 local_ev.objtype 

1482 ) 

1483 __hermes__.logger.debug(f"__localTrashed({local_ev.toString(secretAttrs)})") 

1484 maincache = self.__datamodel.localdata[local_ev.objtype] 

1485 maincache_complete = self.__datamodel.localdata_complete[local_ev.objtype] 

1486 trashbin = self.__datamodel.localdata[f"trashbin_{local_ev.objtype}"] 

1487 trashbin_complete = self.__datamodel.localdata_complete[ 

1488 f"trashbin_{local_ev.objtype}" 

1489 ] 

1490 

1491 l_cachedobj: DataObject = maincache.get(local_ev.objpkey) 

1492 l_cachedobj_complete: DataObject = maincache_complete.get(local_ev.objpkey) 

1493 

1494 if not simulateOnly: 

1495 # Call trashed handler 

1496 self.__callHandler( 

1497 objtype=local_ev.objtype, 

1498 eventtype="trashed", 

1499 objkey=local_ev.objpkey, 

1500 eventattrs=local_ev.objattrs, 

1501 cachedobj=deepcopy(l_cachedobj), 

1502 ) 

1503 

1504 if not simulateOnly: 

1505 # Remove local object from cache 

1506 maincache.remove(l_cachedobj) 

1507 l_cachedobj._trashbin_timestamp = local_ev.timestamp 

1508 # Add local object to trashbin 

1509 trashbin.append(l_cachedobj) 

1510 

1511 if l_cachedobj_complete is not None: 

1512 maincache_complete.remove(l_cachedobj_complete) 

1513 l_cachedobj_complete._trashbin_timestamp = local_ev.timestamp 

1514 trashbin_complete.append(l_cachedobj_complete) 

1515 

1516 def __remoteRemoved( 

1517 self, remote_event: Event, local_event: Event, simulateOnly: bool = False 

1518 ): 

1519 secretAttrs = self.__datamodel.remote_schema.secretsAttributesOf( 

1520 remote_event.objtype 

1521 ) 

1522 __hermes__.logger.debug( 

1523 f"__remoteRemoved({remote_event.toString(secretAttrs)})" 

1524 ) 

1525 

1526 cache, r_cachedobj = Datamodel.getObjectFromCacheOrTrashbin( 

1527 self.__datamodel.remotedata, 

1528 remote_event.objtype, 

1529 remote_event.objpkey, 

1530 ) 

1531 

1532 cache_complete, r_cachedobj_complete = Datamodel.getObjectFromCacheOrTrashbin( 

1533 self.__datamodel.remotedata_complete, 

1534 remote_event.objtype, 

1535 remote_event.objpkey, 

1536 ) 

1537 

1538 self.__processLocalEvent( 

1539 remote_event, 

1540 local_event, 

1541 enqueueEventWithError=False, 

1542 simulateOnly=simulateOnly, 

1543 ) 

1544 

1545 # Remove remote object from cache or trashbin 

1546 if not simulateOnly: 

1547 cache.remove(r_cachedobj) 

1548 

1549 # May already been removed if current event is from errorqueue 

1550 if cache_complete is not None: 

1551 cache_complete.remove(r_cachedobj_complete) 

1552 

1553 if not simulateOnly: 

1554 # Remove eventual events relative to current object from error queue 

1555 self.__datamodel.errorqueue.purgeAllEventsOfDataObject( 

1556 r_cachedobj, isLocalObjtype=False 

1557 ) 

1558 

1559 def __localRemoved(self, local_ev: Event, simulateOnly: bool = False): 

1560 secretAttrs = self.__datamodel.local_schema.secretsAttributesOf( 

1561 local_ev.objtype 

1562 ) 

1563 __hermes__.logger.debug(f"__localRemoved({local_ev.toString(secretAttrs)})") 

1564 

1565 cache, l_cachedobj = Datamodel.getObjectFromCacheOrTrashbin( 

1566 self.__datamodel.localdata, 

1567 local_ev.objtype, 

1568 local_ev.objpkey, 

1569 ) 

1570 

1571 cache_complete, l_cachedobj_complete = Datamodel.getObjectFromCacheOrTrashbin( 

1572 self.__datamodel.localdata_complete, 

1573 local_ev.objtype, 

1574 local_ev.objpkey, 

1575 ) 

1576 

1577 if not simulateOnly: 

1578 # Call removed handler 

1579 self.__callHandler( 

1580 objtype=local_ev.objtype, 

1581 eventtype="removed", 

1582 objkey=local_ev.objpkey, 

1583 eventattrs=local_ev.objattrs, 

1584 cachedobj=deepcopy(l_cachedobj), 

1585 ) 

1586 

1587 # Remove local object from cache or trashbin 

1588 if not simulateOnly: 

1589 cache.remove(l_cachedobj) 

1590 # May already been removed if current event is from errorqueue 

1591 if cache_complete is not None: 

1592 cache_complete.remove(l_cachedobj_complete) 

1593 

1594 if not simulateOnly: 

1595 # Remove eventual events relative to current object from error queue 

1596 self.__datamodel.errorqueue.purgeAllEventsOfDataObject( 

1597 l_cachedobj, isLocalObjtype=True 

1598 ) 

1599 

1600 def __callHandler(self, objtype: str, eventtype: str, **kwargs): 

1601 if not objtype: 

1602 handlerName = f"on_{eventtype}" 

1603 kwargs_filtered = kwargs 

1604 else: 

1605 handlerName = f"on_{objtype}_{eventtype}" 

1606 

1607 # Filter secrets values 

1608 secretAttrs = self.__datamodel.local_schema.secretsAttributesOf(objtype) 

1609 kwargs_filtered = kwargs.copy() 

1610 kwargs_filtered["eventattrs"] = Event.objattrsToString( 

1611 kwargs["eventattrs"], secretAttrs 

1612 ) 

1613 

1614 kwargsstr = ", ".join([f"{k}={repr(v)}" for k, v in kwargs_filtered.items()]) 

1615 

1616 hdlr = getattr(self, handlerName, None) 

1617 

1618 if not callable(hdlr): 

1619 __hermes__.logger.debug( 

1620 f"Calling '{handlerName}({kwargsstr})': handler '{handlerName}()'" 

1621 " doesn't exists" 

1622 ) 

1623 return 

1624 

1625 __hermes__.logger.info( 

1626 f"Calling '{handlerName}({kwargsstr})' - currentStep={self.currentStep}," 

1627 f" isPartiallyProcessed={self.isPartiallyProcessed}," 

1628 f" isAnErrorRetry={self.isAnErrorRetry}" 

1629 ) 

1630 

1631 try: 

1632 hdlr(**kwargs) 

1633 except Exception as e: 

1634 __hermes__.logger.error( 

1635 f"Calling '{handlerName}({kwargsstr})': error met on step" 

1636 f" {self.currentStep} '{str(e)}'" 

1637 ) 

1638 raise HermesClientHandlerError(e) 

1639 

1640 def __processDatamodelUpdate(self): 

1641 """Check difference between current datamodel and previous one. If datamodel has 

1642 changed, generate local events according to datamodel changes""" 

1643 

1644 diff = self.__newdatamodel.diffFrom(self.__datamodel) 

1645 

1646 if not diff: 

1647 __hermes__.logger.info("No change in datamodel") 

1648 # Start working with new datamodel 

1649 self.__datamodel = self.__newdatamodel 

1650 self.__datamodel.loadErrorQueue() 

1651 return 

1652 

1653 __hermes__.logger.info(f"Datamodel has changed: {diff.dict}") 

1654 self.__saveRequired = True 

1655 

1656 # Start by removed types, as it requires the previous datamodel to process data 

1657 # removal 

1658 if diff.removed: 

1659 for l_objtype in diff.removed: 

1660 __hermes__.logger.info(f"About to purge data from type '{l_objtype}'") 

1661 if l_objtype not in self.__datamodel.typesmapping.values(): 

1662 __hermes__.logger.warning( 

1663 f"Requested to purge data from type '{l_objtype}', but it" 

1664 " doesn't exist in previous datamodel: ignoring" 

1665 ) 

1666 continue 

1667 

1668 pkeys = ( 

1669 self.__datamodel.localdata[l_objtype].getPKeys() 

1670 | self.__datamodel.localdata[f"trashbin_{l_objtype}"].getPKeys() 

1671 ) 

1672 

1673 # Call remove on each object 

1674 for pkey in pkeys: 

1675 _, l_obj = Datamodel.getObjectFromCacheOrTrashbin( 

1676 self.__datamodel.localdata, l_objtype, pkey 

1677 ) 

1678 if l_obj: 

1679 l_ev = Event( 

1680 evcategory="base", 

1681 eventtype="removed", 

1682 obj=l_obj, 

1683 objattrs={}, 

1684 ) 

1685 secretAttrs = self.__datamodel.local_schema.secretsAttributesOf( 

1686 l_objtype 

1687 ) 

1688 __hermes__.logger.debug( 

1689 f"Removing local object of {pkey=}:" 

1690 f" {l_ev.toString(secretAttrs)=}" 

1691 ) 

1692 self.__localRemoved(l_ev) 

1693 else: 

1694 __hermes__.logger.error(f"Local object of {pkey=} not found") 

1695 

1696 # All objects have been removed, remove remaining events from 

1697 # errorqueue, if any 

1698 for l_obj in self.__datamodel.localdata_complete[l_objtype]: 

1699 # Remove eventual events relative to current object from error queue 

1700 self.__datamodel.errorqueue.purgeAllEventsOfDataObject( 

1701 l_obj, isLocalObjtype=True 

1702 ) 

1703 

1704 self.__datamodel.saveLocalAndRemoteData() # Save changes 

1705 

1706 # Purge old remote and local cache files, if any 

1707 __hermes__.logger.info( 

1708 f"Types removed from Datamodel: {diff.removed}, purging cache files" 

1709 ) 

1710 Datamodel.purgeOldCacheFiles(diff.removed, cacheFilePrefix="__") 

1711 

1712 # Start working with new datamodel 

1713 self.__datamodel = self.__newdatamodel 

1714 self.__datamodel.loadLocalAndRemoteData() 

1715 

1716 # Reload error queue to allow it to handle new datamodel 

1717 self.__datamodel.saveErrorQueue() 

1718 self.__datamodel.loadErrorQueue() 

1719 

1720 if diff.added or diff.modified: 

1721 # Generate diff events according to datamodel changes 

1722 new_local_data: dict[str, DataObjectList] = {} 

1723 

1724 # Work on "complete" copy of data cache, representing the cache that should 

1725 # be without any event in error queue in order to compute diff on complete 

1726 # cache representation 

1727 completeRemoteData = self.__datamodel.remotedata_complete 

1728 completeLocalData = self.__datamodel.localdata_complete 

1729 

1730 # Loop over each remote type 

1731 for r_objtype in self.__datamodel.remote_schema.objectTypes: 

1732 # For each type, we'll work on classic data, and on trashbin data 

1733 for prefix in ("", "trashbin_"): 

1734 if r_objtype not in self.__datamodel.typesmapping: 

1735 continue # Remote objtype isn't set in current Datamodel 

1736 

1737 # Fetch corresponding local type 

1738 l_objtype = f"{prefix}{self.__datamodel.typesmapping[r_objtype]}" 

1739 

1740 # Convert remote data cache to local data 

1741 new_local_data[l_objtype] = ( 

1742 self.__datamodel.convertDataObjectListToLocal( 

1743 r_objtype, completeRemoteData[f"{prefix}{r_objtype}"] 

1744 ) 

1745 ) 

1746 

1747 # Compute differences between new local data and local data cache 

1748 completeLocalDataObjtype: DataObjectList = completeLocalData.get( 

1749 l_objtype, DataObjectList([]) 

1750 ) 

1751 datadiff = new_local_data[l_objtype].diffFrom( 

1752 completeLocalDataObjtype 

1753 ) 

1754 

1755 for changeType, difflist in datadiff.dict.items(): 

1756 diffitem: DiffObject | DataObject 

1757 for diffitem in difflist: 

1758 # Convert diffitem to local Event 

1759 (event, obj) = Event.fromDiffItem( 

1760 diffitem=diffitem, 

1761 eventCategory="base", 

1762 changeType=changeType, 

1763 ) 

1764 

1765 if prefix == "trashbin_": 

1766 if obj not in completeLocalDataObjtype: 

1767 # Object exists in remote trashbin, but not in 

1768 # local one as it has been removed before its type 

1769 # was added to client's Datamodel. 

1770 # Process a local "added" event, then a local 

1771 # "removed" event to store local object in trashbin 

1772 

1773 # Add local object 

1774 self.__processLocalEvent( 

1775 None, event, enqueueEventWithError=True 

1776 ) 

1777 

1778 # Prepare "removed" event 

1779 event = Event( 

1780 evcategory="base", 

1781 eventtype="removed", 

1782 obj=obj, 

1783 objattrs={}, 

1784 ) 

1785 # Preserve object _trashbin_timestamp 

1786 event.timestamp = completeRemoteData[ 

1787 f"{prefix}{r_objtype}" 

1788 ][obj]._trashbin_timestamp 

1789 else: 

1790 # Preserve object _trashbin_timestamp 

1791 obj._trashbin_timestamp = completeLocalDataObjtype[ 

1792 obj 

1793 ]._trashbin_timestamp 

1794 

1795 # Process Event and update cache if no error is met, 

1796 # enqueue event otherwise 

1797 self.__processLocalEvent( 

1798 None, event, enqueueEventWithError=True 

1799 ) 

1800 

1801 self.__config.savecachefile() # Save config to be able to rebuild datamodel 

1802 self.__datamodel.saveLocalAndRemoteData() # Save data 

1803 self.__checkDatamodelWarnings() 

1804 

1805 def __status( 

1806 self, verbose=False, level="information", ignoreUnhandledExceptions=False 

1807 ) -> dict[str, dict[str, dict[str, Any]]]: 

1808 """Returns a dict containing status for current client Datamodel and error 

1809 queue. 

1810 

1811 Each status contains 3 categories/levels: "information", "warning" and "error" 

1812 """ 

1813 if level not in ("information", "warning", "error"): 

1814 raise AttributeError( 

1815 f"Specified level '{level}' is invalid. Possible values are" 

1816 """("information", "warning", "error"):""" 

1817 ) 

1818 

1819 appname: str = self.__config["appname"] 

1820 

1821 match level: 

1822 case "error": 

1823 levels = ["error"] 

1824 case "warning": 

1825 levels = [ 

1826 "warning", 

1827 "error", 

1828 ] 

1829 case "information": 

1830 levels = [ 

1831 "information", 

1832 "warning", 

1833 "error", 

1834 ] 

1835 

1836 res = { 

1837 appname: { 

1838 "information": { 

1839 "startTime": self.__startTime.strftime("%Y-%m-%d %H:%M:%S"), 

1840 "status": "paused" if self.__isPaused else "running", 

1841 "pausedSince": ( 

1842 self.__isPaused.strftime("%Y-%m-%d %H:%M:%S") 

1843 if self.__isPaused 

1844 else "None" 

1845 ), 

1846 }, 

1847 "warning": {}, 

1848 "error": {}, 

1849 }, 

1850 } 

1851 if not ignoreUnhandledExceptions and self.__cache.exception: 

1852 res[appname]["error"]["unhandledException"] = self.__cache.exception 

1853 

1854 # Datamodel 

1855 res["datamodel"] = { 

1856 "information": {}, 

1857 "warning": {}, 

1858 "error": {}, 

1859 } 

1860 if self.__datamodel.unknownRemoteTypes: 

1861 res["datamodel"]["warning"]["unknownRemoteTypes"] = sorted( 

1862 self.__datamodel.unknownRemoteTypes 

1863 ) 

1864 if self.__datamodel.unknownRemoteAttributes: 

1865 res["datamodel"]["warning"]["unknownRemoteAttributes"] = { 

1866 k: sorted(v) 

1867 for k, v in self.__datamodel.unknownRemoteAttributes.items() 

1868 } 

1869 

1870 # Error queue 

1871 res["errorQueue"] = { 

1872 "information": {}, 

1873 "warning": {}, 

1874 "error": {}, 

1875 } 

1876 

1877 if self.__datamodel.errorqueue is not None: 

1878 eventNumber: int 

1879 remoteEvent: Event | None 

1880 localEvent: Event 

1881 errorMsg: str 

1882 for ( 

1883 eventNumber, 

1884 remoteEvent, 

1885 localEvent, 

1886 errorMsg, 

1887 ) in self.__datamodel.errorqueue: 

1888 if errorMsg is None: 

1889 # Ignore the events in queue that are not errors 

1890 continue 

1891 

1892 # Always try to get object from local cache in order to use configured 

1893 # toString template for obj repr() 

1894 objtype = localEvent.objtype 

1895 

1896 _, obj = Datamodel.getObjectFromCacheOrTrashbin( 

1897 self.__datamodel.localdata_complete, objtype, localEvent.objpkey 

1898 ) 

1899 if obj is None: 

1900 _, obj = Datamodel.getObjectFromCacheOrTrashbin( 

1901 self.__datamodel.localdata, objtype, localEvent.objpkey 

1902 ) 

1903 if obj is None and remoteEvent is not None: 

1904 _, obj = Datamodel.getObjectFromCacheOrTrashbin( 

1905 self.__datamodel.remotedata_complete, 

1906 remoteEvent.objtype, 

1907 remoteEvent.objpkey, 

1908 ) 

1909 if obj is None: 

1910 obj = f"<{localEvent.objtype}[{localEvent.objpkey}]>" 

1911 else: 

1912 obj = repr(obj) 

1913 

1914 res["errorQueue"]["error"][eventNumber] = { 

1915 "objrepr": obj, 

1916 "errorMsg": errorMsg, 

1917 } 

1918 if verbose: 

1919 res["errorQueue"]["error"][eventNumber] |= { 

1920 "objtype": localEvent.objtype, 

1921 "objpkey": localEvent.objpkey, 

1922 "objattrs": localEvent.objattrs, 

1923 } 

1924 

1925 # Clean empty categories 

1926 for objname in list(res.keys()): 

1927 for category in ("information", "warning", "error"): 

1928 if category not in levels or ( 

1929 not verbose and not res[objname][category] 

1930 ): 

1931 del res[objname][category] 

1932 

1933 if not verbose and not res[objname]: 

1934 del res[objname] 

1935 

1936 return res 

1937 

1938 def __notifyQueueErrors(self): 

1939 """Notify of any objects change in error queue""" 

1940 new_error = self.__status(level="error", ignoreUnhandledExceptions=True) 

1941 

1942 new_errors = {} 

1943 if "errorQueue" in new_error: 

1944 for errNumber, err in new_error["errorQueue"]["error"].items(): 

1945 new_errors[errNumber] = f"{err['objrepr']}: {err['errorMsg']}" 

1946 

1947 new_errstr = json.dumps( 

1948 new_errors, 

1949 cls=JSONEncoder, 

1950 indent=4, 

1951 ) 

1952 old_errstr = json.dumps(self.__cache.queueErrors, cls=JSONEncoder, indent=4) 

1953 

1954 if new_errstr != old_errstr: 

1955 if new_errors: 

1956 desc = "objects in error queue have changed" 

1957 else: 

1958 desc = "no more objects in error queue" 

1959 

1960 __hermes__.logger.info(desc) 

1961 Email.sendDiff( 

1962 config=self.__config, 

1963 contentdesc=desc, 

1964 previous=old_errstr, 

1965 current=new_errstr, 

1966 ) 

1967 self.__cache.queueErrors = new_errors 

1968 

1969 def __notifyDatamodelWarnings(self): 

1970 """Notify of any data model warnings changes""" 

1971 new_errors = self.__status(level="warning", ignoreUnhandledExceptions=True) 

1972 

1973 if "datamodel" not in new_errors: 

1974 new_errors = {} 

1975 else: 

1976 new_errors = new_errors["datamodel"] 

1977 

1978 new_errstr = json.dumps( 

1979 new_errors, 

1980 cls=JSONEncoder, 

1981 indent=4, 

1982 ) 

1983 old_errstr = json.dumps( 

1984 self.__cache.datamodelWarnings, cls=JSONEncoder, indent=4 

1985 ) 

1986 

1987 if new_errors: 

1988 __hermes__.logger.error("Datamodel has warnings:\n" + new_errstr) 

1989 

1990 if new_errstr != old_errstr: 

1991 if new_errors: 

1992 desc = "datamodel warnings have changed" 

1993 else: 

1994 desc = "no more datamodel warnings" 

1995 

1996 __hermes__.logger.info(desc) 

1997 Email.sendDiff( 

1998 config=self.__config, 

1999 contentdesc=desc, 

2000 previous=old_errstr, 

2001 current=new_errstr, 

2002 ) 

2003 self.__cache.datamodelWarnings = new_errors 

2004 

2005 def __notifyException(self, trace: str | None): 

2006 """Notify of any unhandled exception met/solved""" 

2007 if trace: 

2008 __hermes__.logger.critical(f"Unhandled exception: {trace}") 

2009 

2010 if self.__cache.exception != trace: 

2011 if trace: 

2012 desc = "unhandled exception" 

2013 else: 

2014 desc = "no more unhandled exception" 

2015 

2016 __hermes__.logger.info(desc) 

2017 previous = "" if self.__cache.exception is None else self.__cache.exception 

2018 current = "" if trace is None else trace 

2019 Email.sendDiff( 

2020 config=self.__config, 

2021 contentdesc=desc, 

2022 previous=previous, 

2023 current=current, 

2024 ) 

2025 self.__cache.exception = trace 

2026 

2027 def __notifyFatalException(self, trace: str): 

2028 """Notify of any fatal exception met before, and terminate app""" 

2029 self.__isStopped = True 

2030 

2031 desc = "Unhandled fatal exception, APP WILL TERMINATE IMMEDIATELY" 

2032 NL = "\n" 

2033 

2034 __hermes__.logger.critical(f"{desc}: {trace}") 

2035 

2036 Email.send( 

2037 config=self.__config, 

2038 subject=f"[{self.__config['appname']}] {desc}", 

2039 content=f"{desc}:{NL}{NL}{trace}", 

2040 )