Coverage for clients / __init__.py: 83%

890 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-03-16 15:11 +0000

1#!/usr/bin/env python3 

2# -*- coding: utf-8 -*- 

3 

4# Hermes : Change Data Capture (CDC) tool from any source(s) to any target 

5# Copyright (C) 2023 INSA Strasbourg 

6# 

7# This file is part of Hermes. 

8# 

9# Hermes is free software: you can redistribute it and/or modify 

10# it under the terms of the GNU General Public License as published by 

11# the Free Software Foundation, either version 3 of the License, or 

12# (at your option) any later version. 

13# 

14# Hermes is distributed in the hope that it will be useful, 

15# but WITHOUT ANY WARRANTY; without even the implied warranty of 

16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

17# GNU General Public License for more details. 

18# 

19# You should have received a copy of the GNU General Public License 

20# along with Hermes. If not, see <https://www.gnu.org/licenses/>. 

21 

22 

23from clients.datamodel import Datamodel, InvalidDataError 

24from lib.config import HermesConfig 

25from lib.version import HERMES_VERSION 

26from lib.datamodel.dataobject import DataObject 

27from lib.datamodel.dataobjectlist import DataObjectList 

28from lib.datamodel.datasource import Dataschema, Datasource 

29from lib.datamodel.diffobject import DiffObject 

30from lib.datamodel.event import Event 

31from lib.datamodel.serialization import LocalCache, JSONEncoder 

32from lib.plugins import AbstractMessageBusConsumerPlugin 

33from lib.utils.mail import Email 

34from lib.utils.socket import ( 

35 SockServer, 

36 SocketMessageToServer, 

37 SocketMessageToClient, 

38 SocketArgumentParser, 

39 SocketParsingError, 

40 SocketParsingMessage, 

41) 

42 

43from copy import deepcopy 

44from datetime import datetime, timedelta 

45from time import sleep 

46from types import FrameType 

47from typing import Any 

48import argparse 

49import json 

50import signal 

51import traceback 

52 

53 

54class HermesAlreadyNotifiedException(Exception): 

55 """Raised when an exception has already been notified, to avoid a second 

56 notification""" 

57 

58 

59class HermesClientHandlerError(Exception): 

60 """Raised when an exception is met during client handler call""" 

61 

62 def __init__(self, err: Exception | str | None): 

63 if isinstance(err, Exception): 

64 self.msg: str | None = HermesClientHandlerError.exceptionToString(err) 

65 """Printable error message, None in a rare case where exception is raised 

66 without error, but to postpone an event processing""" 

67 else: 

68 self.msg = err 

69 super().__init__(self.msg) 

70 

71 @staticmethod 

72 def exceptionToString( 

73 exception: Exception, purgeCurrentFileFromTrace: bool = True 

74 ) -> str: 

75 """Convert the specified exception to a string containing its full trace""" 

76 lines = traceback.format_exception( 

77 type(exception), exception, exception.__traceback__ 

78 ) 

79 

80 if purgeCurrentFileFromTrace: 

81 # Purging current file infos from traceback 

82 lines = [line for line in lines if __file__ not in line] 

83 

84 return "".join(lines).strip() 

85 

86 

87class HermesClientCache(LocalCache): 

88 """Hermes client data to cache""" 

89 

90 def __init__(self, from_json_dict: dict[str, Any] = {}): 

91 super().__init__( 

92 jsondataattr=[ 

93 "queueErrors", 

94 "datamodelWarnings", 

95 "exception", 

96 "initstartoffset", 

97 "initstopoffset", 

98 "nextoffset", 

99 ], 

100 ) 

101 

102 self.queueErrors: dict[str, str] = from_json_dict.get("queueErrors", {}) 

103 """Dictionary containing current objects in error queue, for notifications""" 

104 

105 self.datamodelWarnings: dict[str, dict[str, dict[str, Any]]] = ( 

106 from_json_dict.get("datamodelWarnings", {}) 

107 ) 

108 """Dictionary containing current datamodel warnings, for notifications""" 

109 

110 self.exception: str | None = from_json_dict.get("exception") 

111 """String containing latest exception trace""" 

112 

113 self.initstartoffset: Any | None = from_json_dict.get("initstartoffset") 

114 """Contains the offset of the first message of initSync sequence on message 

115 bus""" 

116 self.initstopoffset: Any | None = from_json_dict.get("initstopoffset") 

117 """Contains the offset of the last message of initSync sequence on message 

118 bus""" 

119 self.nextoffset: Any | None = from_json_dict.get("nextoffset") 

120 """Contains the offset of the next message to process on message bus""" 

121 

122 def savecachefile(self, cacheFilename: str | None = None): 

123 """Override method only to disable backup files in cache""" 

124 return super().savecachefile(cacheFilename, dontKeepBackup=True) 

125 

126 

127class GenericClient: 

128 """Superclass of all hermes-client implementations. 

129 Manage all the internals of hermes for a client: datamodel updates, caching, error 

130 management, trashbin and converting messages from message bus into events handlers 

131 calls""" 

132 

133 __FOREIGNKEYS_POLICIES: dict[str, tuple[str]] = { 

134 "disabled": tuple(), 

135 "on_remove_event": ("removed",), 

136 "on_every_event": ("added", "modified", "removed"), 

137 } 

138 """Different foreignkeys_policy settings : associate each foreignkeys_policy 

139 (as key) with the list of event types that will be placed in the error queue if the 

140 object concerning them is the parent (by foreign key) of an object already present 

141 in the error queue""" 

142 

143 def __init__(self, config: HermesConfig): 

144 """Instantiate a new client""" 

145 

146 __hermes__.logger.info(f"Starting {config['appname']} v{HERMES_VERSION}") 

147 

148 # Setup the signals handler 

149 config.setSignalsHandler(self.__signalHandler) 

150 

151 self.__config: HermesConfig = config 

152 """Current config""" 

153 try: 

154 self.config: dict[str, Any] = self.__config[self.__config["appname"]] 

155 """Dict containing the client plugin configuration""" 

156 except KeyError: 

157 self.config = {} 

158 

159 self.__previousconfig: HermesConfig = HermesConfig.loadcachefile( 

160 "_hermesconfig" 

161 ) 

162 """Previous config (from cache)""" 

163 

164 self.__msgbus: AbstractMessageBusConsumerPlugin = self.__config["hermes"][ 

165 "plugins" 

166 ]["messagebus"]["plugininstance"] 

167 self.__msgbus.setTimeout( 

168 self.__config["hermes-client"]["updateInterval"] * 1000 

169 ) 

170 

171 self.__cache: HermesClientCache = HermesClientCache.loadcachefile( 

172 f"_{self.__config['appname']}" 

173 ) 

174 """Cached attributes""" 

175 self.__cache.setCacheFilename(f"_{self.__config['appname']}") 

176 

177 self.__startTime: datetime | None = None 

178 """Datetime when mainloop was started""" 

179 

180 self.__isPaused: datetime | None = None 

181 """Contains pause datetime if standard processing is paused, None otherwise""" 

182 

183 self.__isStopped: bool = False 

184 """mainloop() will run until this var is set to True""" 

185 

186 self.__numberOfLoopToProcess: int | None = None 

187 """**For functionnal tests only**, if a value is set, will process for *value* 

188 iterations of mainloop and pause execution until a new positive value is set""" 

189 

190 self.__sock: SockServer | None = None 

191 """Facultative socket to allow cli communication""" 

192 if ( 

193 self.__config["hermes"]["cli_socket"]["path"] is not None 

194 or self.__config["hermes"]["cli_socket"]["dont_manage_sockfile"] is not None 

195 ): 

196 self.__sock = SockServer( 

197 path=self.__config["hermes"]["cli_socket"]["path"], 

198 owner=self.__config["hermes"]["cli_socket"]["owner"], 

199 group=self.__config["hermes"]["cli_socket"]["group"], 

200 mode=self.__config["hermes"]["cli_socket"]["mode"], 

201 processHdlr=self.__processSocketMessage, 

202 dontManageSockfile=self.__config["hermes"]["cli_socket"][ 

203 "dont_manage_sockfile" 

204 ], 

205 ) 

206 self.__setupSocketParser() 

207 

208 self.__useFirstInitsyncSequence: bool = self.__config["hermes-client"][ 

209 "useFirstInitsyncSequence" 

210 ] 

211 """Indicate if we prefer using the first/oldest or last/most recent 

212 initsync sequence available on message bus""" 

213 

214 self.__newdatamodel: Datamodel = Datamodel(config=self.__config) 

215 """New datamodel (from current config)""" 

216 

217 if self.__previousconfig.hasData(): 

218 # Start app with previous datamodel to be able to check differences with 

219 # new one through self.__processDatamodelUpdate() 

220 self.__datamodel: Datamodel = Datamodel(config=self.__previousconfig) 

221 """Current datamodel""" 

222 else: 

223 # As no previous datamodel is available, start app with new one 

224 self.__datamodel = self.__newdatamodel 

225 

226 self.__trashbin_retention: timedelta | None = None 

227 """Timedelta with delay to keep removed data in trashbin before permanently 

228 deleting it. 'None' means no trashbin""" 

229 if self.__config["hermes-client"]["trashbin_retention"] > 0: 

230 self.__trashbin_retention = timedelta( 

231 days=self.__config["hermes-client"]["trashbin_retention"] 

232 ) 

233 

234 self.__trashbin_purgeInterval: timedelta | None = timedelta( 

235 minutes=self.__config["hermes-client"]["trashbin_purgeInterval"] 

236 ) 

237 """Timedelta with delay between two trashbin purge attempts""" 

238 

239 self.__trashbin_lastpurge: datetime = datetime(year=1, month=1, day=1) 

240 """Datetime when latest trashbin purge was ran""" 

241 

242 self.__errorQueue_retryInterval: timedelta | None = timedelta( 

243 minutes=self.__config["hermes-client"]["errorQueue_retryInterval"] 

244 ) 

245 """Timedelta with delay between two attempts of processing events in error""" 

246 

247 self.__errorQueue_lastretry: datetime = datetime(year=1, month=1, day=1) 

248 """Datetime when latest error queue retry was ran""" 

249 

250 self.__currentStep: int = 0 

251 """Store the step number of current event processing. Will be stored in events 

252 in error queue to allow clients to resume an event where it has failed""" 

253 

254 self.__isPartiallyProcessed: bool = False 

255 """Store if some data has been processed during current event processing. Will 

256 be stored in events in error queue to handle autoremediation properly""" 

257 

258 self.__isAnErrorRetry: bool = False 

259 """Indicate to handler whether the current event is being processed as part of 

260 an error retry""" 

261 

262 self.__saveRequired: bool = False 

263 """Reset to False at each loop start, and if any change is made during 

264 processing, set to True in order to save all cache at the loop end. 

265 Used to avoid expensive .save() calls when unnecessary 

266 """ 

267 

268 self.__foreignkeys_events: tuple[str] = self.__FOREIGNKEYS_POLICIES[ 

269 self.__config["hermes-client"]["foreignkeys_policy"] 

270 ] 

271 """List of event types that will be placed in the error queue if the object 

272 concerning them is the parent (by foreign key) of an object already present in 

273 the error queue""" 

274 

275 def getObjectFromCache(self, objtype: str, objpkey: Any) -> DataObject: 

276 """Returns a deepcopy of an object from cache. 

277 Raise IndexError if objtype is invalid, or if objpkey is not found 

278 """ 

279 ds: Datasource = self.__datamodel.localdata 

280 

281 _, obj = Datamodel.getObjectFromCacheOrTrashbin(ds, objtype, objpkey) 

282 if obj is None: 

283 raise IndexError( 

284 f"No object of {objtype=} with {objpkey=} was found in cache" 

285 ) 

286 

287 return deepcopy(obj) 

288 

289 def getDataobjectlistFromCache(self, objtype: str) -> DataObjectList: 

290 """Returns cache of specified objtype, by reference. 

291 WARNING: Any modification of the cache content will mess up your client !!! 

292 Raise IndexError if objtype is invalid 

293 """ 

294 ds: Datasource = self.__datamodel.localdata 

295 cache = ds[objtype] 

296 trashbin = ds[f"trashbin_{objtype}"] 

297 

298 # Create an empty DataObjectList of same type as cache 

299 res = type(cache)(objlist=[]) 

300 

301 res.extend(cache) 

302 res.extend(trashbin) 

303 return res 

304 

305 def __signalHandler(self, signalnumber: int, frame: FrameType | None): 

306 """Signal handler that will be called on SIGINT and SIGTERM""" 

307 __hermes__.logger.critical( 

308 f"Signal '{signal.strsignal(signalnumber)}' received, terminating" 

309 ) 

310 self.__isStopped = True 

311 

312 def __setupSocketParser(self): 

313 """Set up the argparse context for unix socket commands""" 

314 self.__parser = SocketArgumentParser( 

315 prog=f"{self.__config['appname']}-cli", 

316 description=f"Hermes client {self.__config['appname']} CLI", 

317 exit_on_error=False, 

318 ) 

319 

320 subparsers = self.__parser.add_subparsers(help="Sub-commands") 

321 

322 # Quit 

323 sp_quit = subparsers.add_parser("quit", help=f"Stop {self.__config['appname']}") 

324 sp_quit.set_defaults(func=self.__sock_quit) 

325 

326 # Pause 

327 sp_pause = subparsers.add_parser( 

328 "pause", help="Pause processing until 'resume' command is sent" 

329 ) 

330 sp_pause.set_defaults(func=self.__sock_pause) 

331 

332 # Resume 

333 sp_resume = subparsers.add_parser( 

334 "resume", help="Resume processing that has been paused with 'pause'" 

335 ) 

336 sp_resume.set_defaults(func=self.__sock_resume) 

337 

338 # Status 

339 sp_status = subparsers.add_parser( 

340 "status", help=f"Show {self.__config['appname']} status" 

341 ) 

342 sp_status.set_defaults(func=self.__sock_status) 

343 sp_status.add_argument( 

344 "-j", 

345 "--json", 

346 action="store_const", 

347 const=True, 

348 default=False, 

349 help="Print status as json", 

350 ) 

351 sp_status.add_argument( 

352 "-v", 

353 "--verbose", 

354 action="store_const", 

355 const=True, 

356 default=False, 

357 help="Output items without values", 

358 ) 

359 

360 def __processSocketMessage( 

361 self, msg: SocketMessageToServer 

362 ) -> SocketMessageToClient: 

363 """Handler that process specified msg received on unix socket and returns the 

364 answer to send""" 

365 reply: SocketMessageToClient | None = None 

366 

367 try: 

368 args = self.__parser.parse_args(msg.argv) 

369 if "func" not in args: 

370 raise SocketParsingMessage(self.__parser.format_help()) 

371 except (SocketParsingError, SocketParsingMessage) as e: 

372 retmsg = str(e) 

373 except argparse.ArgumentError as e: 

374 retmsg = self.__parser.format_error(str(e)) 

375 else: 

376 try: 

377 reply = args.func(args) 

378 except Exception as e: 

379 lines = traceback.format_exception(type(e), e, e.__traceback__) 

380 trace = "".join(lines).strip() 

381 __hermes__.logger.critical(f"Unhandled exception: {trace}") 

382 retmsg = trace 

383 

384 if reply is None: # Error was met 

385 reply = SocketMessageToClient(retcode=1, retmsg=retmsg) 

386 

387 return reply 

388 

389 def __sock_quit(self, args: argparse.Namespace) -> SocketMessageToClient: 

390 """Handler called when quit subcommand is requested on unix socket""" 

391 self.__isStopped = True 

392 __hermes__.logger.info(f"{self.__config['appname']} has been requested to quit") 

393 return SocketMessageToClient(retcode=0, retmsg="") 

394 

395 def __sock_pause(self, args: argparse.Namespace) -> SocketMessageToClient: 

396 """Handler called when pause subcommand is requested on unix socket""" 

397 if self.__isStopped: 

398 return SocketMessageToClient( 

399 retcode=1, 

400 retmsg=f"Error: {self.__config['appname']} is currently being stopped", 

401 ) 

402 

403 if self.__isPaused: 

404 return SocketMessageToClient( 

405 retcode=1, 

406 retmsg=f"Error: {self.__config['appname']} is already paused", 

407 ) 

408 

409 __hermes__.logger.info( 

410 f"{self.__config['appname']} has been requested to pause" 

411 ) 

412 self.__isPaused = datetime.now() 

413 return SocketMessageToClient(retcode=0, retmsg="") 

414 

415 def __sock_resume(self, args: argparse.Namespace) -> SocketMessageToClient: 

416 """Handler called when resume subcommand is requested on unix socket""" 

417 if self.__isStopped: 

418 return SocketMessageToClient( 

419 retcode=1, 

420 retmsg=f"Error: {self.__config['appname']} is currently being stopped", 

421 ) 

422 

423 if not self.__isPaused: 

424 return SocketMessageToClient( 

425 retcode=1, retmsg=f"Error: {self.__config['appname']} is not paused" 

426 ) 

427 

428 __hermes__.logger.info( 

429 f"{self.__config['appname']} has been requested to resume" 

430 ) 

431 self.__isPaused = None 

432 return SocketMessageToClient(retcode=0, retmsg="") 

433 

434 def __sock_status(self, args: argparse.Namespace) -> SocketMessageToClient: 

435 """Handler called when status subcommand is requested on unix socket""" 

436 status = self.__status(verbose=args.verbose) 

437 if args.json: 

438 msg = json.dumps(status, cls=JSONEncoder, indent=4) 

439 else: 

440 nl = "\n" 

441 info2printable = {} 

442 msg = "" 

443 for objname in [self.__config["appname"]] + list( 

444 status.keys() - (self.__config["appname"],) 

445 ): 

446 infos = status[objname] 

447 msg += f"{objname}:{nl}" 

448 for category in ("information", "warning", "error"): 

449 if category not in infos: 

450 continue 

451 if not infos[category]: 

452 msg += f" * {category.capitalize()}: []{nl}" 

453 continue 

454 

455 msg += f" * {category.capitalize()}{nl}" 

456 for infoname, infodata in infos[category].items(): 

457 indentedinfodata = str(infodata).replace("\n", "\n ") 

458 msg += ( 

459 f" - {info2printable.get(infoname, infoname)}:" 

460 f" {indentedinfodata}{nl}" 

461 ) 

462 msg = msg.rstrip() 

463 

464 return SocketMessageToClient(retcode=0, retmsg=msg) 

465 

466 @property 

467 def currentStep(self) -> int: 

468 """Step number of current event processed. 

469 Allow clients to resume an event where it has failed""" 

470 return self.__currentStep 

471 

472 @currentStep.setter 

473 def currentStep(self, value: int): 

474 if type(value) is not int: 

475 raise TypeError( 

476 f"Specified step {value=} has invalid type '{type(value)}'" 

477 " instead of int" 

478 ) 

479 

480 if value < 0: 

481 raise ValueError(f"Specified step {value=} must be greater or equal to 0") 

482 

483 self.__currentStep = value 

484 

485 @property 

486 def isPartiallyProcessed(self) -> bool: 

487 """Indicate if some data has been processed during current event processing. 

488 Required to handle autoremediation properly""" 

489 return self.__isPartiallyProcessed 

490 

491 @isPartiallyProcessed.setter 

492 def isPartiallyProcessed(self, value: bool): 

493 if type(value) is not bool: 

494 raise TypeError( 

495 f"Specified isPartiallyProcessed {value=} has invalid type" 

496 f" '{type(value)}' instead of bool" 

497 ) 

498 

499 self.__isPartiallyProcessed = value 

500 

501 @property 

502 def isAnErrorRetry(self) -> bool: 

503 """Read-only attribute that indicates to handler whether the current event is 

504 being processed as part of an error retry""" 

505 return self.__isAnErrorRetry 

506 

507 def mainLoop(self): 

508 """Client main loop""" 

509 self.__startTime = datetime.now() 

510 

511 self.__checkDatamodelWarnings() 

512 # TODO: implement a check to ensure subclasses required data types and 

513 # attributes exist in datamodel 

514 

515 if self.__datamodel.hasRemoteSchema(): 

516 __hermes__.logger.debug( 

517 "Remote Dataschema in cache:" 

518 f" {self.__datamodel.remote_schema.to_json()=}" 

519 ) 

520 self.__datamodel.loadErrorQueue() 

521 else: 

522 __hermes__.logger.debug("No remote Dataschema in cache yet") 

523 

524 if self.__sock is not None: 

525 self.__sock.startProcessMessagesDaemon(appname=__hermes__.appname) 

526 

527 # Reduce sleep duration during functional tests to speed them up 

528 sleepDuration = 1 if self.__numberOfLoopToProcess is None else 0.05 

529 

530 isFirstLoopIteration: bool = True 

531 while not self.__isStopped: 

532 self.__saveRequired = False 

533 

534 try: 

535 with self.__msgbus: 

536 if self.__isPaused or self.__numberOfLoopToProcess == 0: 

537 sleep(sleepDuration) 

538 continue 

539 

540 try: 

541 if self.__hasAlreadyBeenInitialized(): 

542 if isFirstLoopIteration: 

543 try: 

544 self.__processDatamodelUpdate() 

545 except Exception as e: 

546 self.__notifyFatalException( 

547 HermesClientHandlerError.exceptionToString( 

548 e, purgeCurrentFileFromTrace=False 

549 ) 

550 ) 

551 raise HermesAlreadyNotifiedException 

552 self.__retryErrorQueue() 

553 self.__emptyTrashBin() 

554 self.__processEvents(isInitSync=False) 

555 else: 

556 __hermes__.logger.info( 

557 "Client hasn't ran its first initsync sequence yet" 

558 ) 

559 if self.__canBeInitialized(): 

560 __hermes__.logger.info( 

561 "First initsync sequence processing begins" 

562 ) 

563 self.__processEvents(isInitSync=True) 

564 if self.__hasAlreadyBeenInitialized(): 

565 __hermes__.logger.info( 

566 "First initsync sequence processing completed" 

567 ) 

568 else: 

569 __hermes__.logger.info( 

570 "No initsync sequence is available on message bus." 

571 " Retry..." 

572 ) 

573 

574 self.__notifyException(None) 

575 

576 except HermesAlreadyNotifiedException: 

577 pass 

578 except InvalidDataError as e: 

579 self.__notifyFatalException( 

580 HermesClientHandlerError.exceptionToString( 

581 e, purgeCurrentFileFromTrace=False 

582 ) 

583 ) 

584 except Exception as e: 

585 self.__notifyException( 

586 HermesClientHandlerError.exceptionToString( 

587 e, purgeCurrentFileFromTrace=False 

588 ) 

589 ) 

590 finally: 

591 isFirstLoopIteration = False 

592 # Still could be True if an exception was raised in 

593 # __retryErrorQueue() 

594 self.__isAnErrorRetry = False 

595 except Exception as e: 

596 __hermes__.logger.warning( 

597 "Message bus seems to be unavailable." 

598 " Waiting 60 seconds before retrying" 

599 ) 

600 self.__notifyException( 

601 HermesClientHandlerError.exceptionToString( 

602 e, purgeCurrentFileFromTrace=False 

603 ) 

604 ) 

605 # Wait one second 60 times to avoid waiting too long before stopping 

606 for i in range(60): 

607 if self.__isStopped: 

608 break 

609 sleep(1) 

610 finally: 

611 if self.__saveRequired or self.__isStopped: 

612 self.__datamodel.saveErrorQueue() 

613 if self.__hasAtLeastBeganInitialization(): 

614 self.__datamodel.saveLocalAndRemoteData() 

615 # Reset current step for "on_save" event 

616 self.currentStep = 0 

617 self.isPartiallyProcessed = False 

618 # Call special event "on_save()" 

619 self.__callHandler("", "save") 

620 self.__notifyQueueErrors() 

621 self.__cache.savecachefile() 

622 

623 if self.__isStopped: 

624 # Only to ensure cache files version update is saved, 

625 # to avoid version migrations at each restart, 

626 # as those files aren't expected to be updated often 

627 if ( 

628 self.__hasAtLeastBeganInitialization() 

629 and self.__cache.nextoffset is not None 

630 and self.__cache.nextoffset > self.__cache.initstartoffset 

631 ): 

632 # Do not save until a first event has been properly handled 

633 self.__config.savecachefile() 

634 self.__datamodel.remote_schema.savecachefile() 

635 

636 # Only used in functionnal tests 

637 if self.__numberOfLoopToProcess: 

638 self.__numberOfLoopToProcess -= 1 

639 

640 def __retryErrorQueue(self): 

641 # Enforce retryInterval 

642 now = datetime.now() 

643 if now < self.__errorQueue_lastretry + self.__errorQueue_retryInterval: 

644 return # Too early to process again 

645 

646 # All events processed in __retryErrorQueue() require this attribute to be True 

647 self.__isAnErrorRetry = True 

648 

649 done = False 

650 evNumbersToRetry: list[int] = [] 

651 eventNumber: int 

652 localEvent: Event 

653 remoteEvent: Event | None 

654 while not done: 

655 retryQueue: list[int] = [] 

656 previousKeys = self.__datamodel.errorqueue.keys() 

657 

658 for ( 

659 eventNumber, 

660 remoteEvent, 

661 localEvent, 

662 errorMsg, 

663 ) in self.__datamodel.errorqueue: 

664 if evNumbersToRetry and eventNumber not in evNumbersToRetry: 

665 # Ignore eventNumber absent from evNumbersToRetry, 

666 # excepted on first iteration of while loop 

667 continue 

668 

669 if remoteEvent is not None: 

670 if self.__datamodel.errorqueue.isEventAParentOfAnotherError( 

671 remoteEvent, False 

672 ): 

673 __hermes__.logger.info( 

674 f"Won't retry remote event {remoteEvent} from error queue" 

675 " as it is still a dependency of another error" 

676 ) 

677 retryQueue.append(eventNumber) 

678 continue 

679 

680 __hermes__.logger.info( 

681 f"Retrying to process remote event {remoteEvent} from error" 

682 " queue" 

683 ) 

684 try: 

685 self.__processRemoteEvent( 

686 remoteEvent, localEvent, enqueueEventWithError=False 

687 ) 

688 except HermesClientHandlerError as e: 

689 __hermes__.logger.info( 

690 f"... failed on step {self.currentStep}: {str(e)}" 

691 ) 

692 remoteEvent.step = self.currentStep 

693 remoteEvent.isPartiallyProcessed = self.isPartiallyProcessed 

694 localEvent.step = self.currentStep 

695 localEvent.isPartiallyProcessed = self.isPartiallyProcessed 

696 self.__datamodel.errorqueue.updateErrorMsg(eventNumber, e.msg) 

697 else: 

698 # If event has suppressed object, eventNumber has already been 

699 # purged from queue 

700 self.__datamodel.errorqueue.remove( 

701 eventNumber, ignoreMissingEventNumber=True 

702 ) 

703 else: 

704 if self.__datamodel.errorqueue.isEventAParentOfAnotherError( 

705 localEvent, True 

706 ): 

707 __hermes__.logger.info( 

708 f"Won't retry local event {localEvent} from error queue" 

709 " as it is still a dependency of another error" 

710 ) 

711 retryQueue.append(eventNumber) 

712 continue 

713 __hermes__.logger.info( 

714 f"Retrying to process local event {localEvent} from error queue" 

715 ) 

716 try: 

717 self.__processLocalEvent( 

718 remoteEvent, localEvent, enqueueEventWithError=False 

719 ) 

720 except HermesClientHandlerError as e: 

721 __hermes__.logger.info( 

722 f"... failed on step {self.currentStep}: {str(e)}" 

723 ) 

724 localEvent.step = self.currentStep 

725 localEvent.isPartiallyProcessed = self.isPartiallyProcessed 

726 self.__datamodel.errorqueue.updateErrorMsg(eventNumber, e.msg) 

727 else: 

728 # If event has suppressed object, eventNumber has already been 

729 # purged from queue 

730 self.__datamodel.errorqueue.remove( 

731 eventNumber, ignoreMissingEventNumber=True 

732 ) 

733 while self.__isPaused and not self.__isStopped: 

734 sleep(1) # Allow loop to be paused 

735 if self.__isStopped: 

736 break # Allow loop to be interrupted if requested 

737 else: 

738 # Update __errorQueue_lastretry only if loop hasn't been interrupted 

739 self.__errorQueue_lastretry = now 

740 

741 done = previousKeys == self.__datamodel.errorqueue.keys() or not retryQueue 

742 if done: 

743 if previousKeys: 

744 __hermes__.logger.debug( 

745 f"End of retryerrorqueue {previousKeys=}" 

746 f" {self.__datamodel.errorqueue.keys()=} - {retryQueue=}" 

747 ) 

748 else: 

749 __hermes__.logger.debug( 

750 "As some event have been processed, will retry ignored events" 

751 f" {retryQueue}" 

752 ) 

753 evNumbersToRetry = retryQueue.copy() 

754 

755 self.__isAnErrorRetry = False # End of __retryErrorQueue() 

756 

757 def __emptyTrashBin(self, force: bool = False): 

758 # Enforce purgeInterval 

759 now = datetime.now() 

760 if ( 

761 not force 

762 and now < self.__trashbin_lastpurge + self.__trashbin_purgeInterval 

763 ): 

764 return # Too early to process again 

765 if self.__trashbin_retention is not None: 

766 retentionLimit = datetime.now() - self.__trashbin_retention 

767 

768 objtype: str 

769 objs: DataObjectList 

770 # As we'll remove objects, process data in the datamodel reversed declaration 

771 # order 

772 for objtype, objs in reversed(self.__datamodel.remotedata.items()): 

773 if not objtype.startswith("trashbin_"): 

774 continue 

775 

776 for pkey in objs.getPKeys(): 

777 obj = objs.get(pkey) 

778 if ( 

779 self.__trashbin_retention is None 

780 or obj._trashbin_timestamp < retentionLimit 

781 ): 

782 event = Event( 

783 evcategory="base", eventtype="removed", obj=obj, objattrs={} 

784 ) 

785 __hermes__.logger.info(f"Trying to purge {repr(obj)} from trashbin") 

786 if self.__datamodel.errorqueue.containsObjectByEvent( 

787 event, isLocalEvent=False 

788 ) and not self.__datamodel.errorqueue.isEventAParentOfAnotherError( 

789 event, isLocalEvent=False 

790 ): 

791 try: 

792 self.__processRemoteEvent( 

793 event, local_event=None, enqueueEventWithError=False 

794 ) 

795 except HermesClientHandlerError: 

796 pass 

797 else: 

798 self.__processRemoteEvent( 

799 event, local_event=None, enqueueEventWithError=True 

800 ) 

801 

802 while self.__isPaused and not self.__isStopped: 

803 sleep(1) # Allow loop to be paused 

804 if self.__isStopped: 

805 break # Allow loop to be interrupted if requested 

806 

807 while self.__isPaused and not self.__isStopped: 

808 sleep(1) # Allow loop to be paused 

809 if self.__isStopped: 

810 break # Allow loop to be interrupted if requested 

811 else: 

812 # Update __trashbin_lastpurge only if loop hasn't been interrupted 

813 self.__trashbin_lastpurge = now 

814 

815 def __hasAlreadyBeenInitialized(self) -> bool: 

816 if ( 

817 self.__cache.initstartoffset is None 

818 or self.__cache.initstopoffset is None 

819 or self.__cache.nextoffset is None 

820 or self.__cache.nextoffset < self.__cache.initstopoffset 

821 ): 

822 return False 

823 return True 

824 

825 def __hasAtLeastBeganInitialization(self) -> bool: 

826 return ( 

827 self.__cache.initstartoffset is not None 

828 and self.__datamodel.hasRemoteSchema() 

829 ) 

830 

831 def __canBeInitialized(self) -> bool: 

832 self.__msgbus.seekToBeginning() 

833 

834 # List of (start, stop) offsets of initsync sequences found 

835 initSyncFound: list[tuple[Any, Any]] = [] 

836 

837 start = None 

838 stop = None 

839 event: Event 

840 for event in self.__msgbus: 

841 if event.evcategory != "initsync": 

842 continue 

843 if event.eventtype == "init-start": 

844 start = event.offset 

845 elif event.eventtype == "init-stop" and start is not None: 

846 stop = event.offset 

847 initSyncFound.append( 

848 (start, stop), 

849 ) 

850 if self.__useFirstInitsyncSequence: 

851 break # We found the first sequence 

852 else: 

853 # Continue to find a new complete sequence 

854 start = None 

855 stop = None 

856 

857 if not initSyncFound: 

858 return False 

859 

860 if self.__useFirstInitsyncSequence: 

861 start, stop = initSyncFound[0] 

862 else: 

863 start, stop = initSyncFound[-1] 

864 

865 if self.__cache.nextoffset is None or self.__cache.nextoffset < start: 

866 self.__cache.nextoffset = start 

867 

868 self.__cache.initstartoffset = start 

869 self.__cache.initstopoffset = stop 

870 __hermes__.logger.debug( 

871 "Init sequence was found in Kafka at offsets" 

872 f" [{self.__cache.initstartoffset} ; {self.__cache.initstopoffset}]" 

873 ) 

874 return True 

875 

876 def __updateSchema(self, newSchema: Dataschema): 

877 if self.__datamodel.forcePurgeOfTrashedObjectsWithoutNewPkeys( 

878 self.__datamodel.remote_schema, newSchema 

879 ): 

880 self.__emptyTrashBin(force=True) 

881 

882 self.__datamodel.updateSchema(newSchema) 

883 self.__config.savecachefile() # Save config to be able to rebuild datamodel 

884 # Save and reload error queue to purge it from events of any suppressed types 

885 self.__datamodel.saveErrorQueue() 

886 self.__datamodel.loadErrorQueue() 

887 self.__checkDatamodelWarnings() 

888 

889 def __checkDatamodelWarnings(self): 

890 if self.__datamodel.unknownRemoteTypes: 

891 __hermes__.logger.warning( 

892 "Datamodel errors: remote types" 

893 f" '{self.__datamodel.unknownRemoteTypes}'" 

894 " don't exist in current Dataschema" 

895 ) 

896 

897 if self.__datamodel.unknownRemoteAttributes: 

898 __hermes__.logger.warning( 

899 "Datamodel errors: remote attributes don't exist in current" 

900 f" Dataschema: {self.__datamodel.unknownRemoteAttributes}" 

901 ) 

902 self.__notifyDatamodelWarnings() 

903 

904 def __processEvents(self, isInitSync=False): 

905 remote_event: Event 

906 schema: Dataschema | None = None 

907 evcategory: str = "initsync" if isInitSync else "base" 

908 

909 self.__msgbus.seek(self.__cache.nextoffset) 

910 

911 for remote_event in self.__msgbus: 

912 self.__saveRequired = True 

913 if isInitSync and remote_event.offset > self.__cache.initstopoffset: 

914 # Should never be called 

915 self.__cache.nextoffset = remote_event.offset + 1 

916 break 

917 

918 # TODO: implement data consistency check if event.evcategory==initsync 

919 # and evcategory==base 

920 

921 if remote_event.evcategory != evcategory: 

922 self.__cache.nextoffset = remote_event.offset + 1 

923 continue 

924 

925 if isInitSync: 

926 if remote_event.eventtype == "init-start": 

927 schema = Dataschema.from_json(remote_event.objattrs) 

928 self.__updateSchema(schema) 

929 continue 

930 

931 if remote_event.eventtype == "init-stop": 

932 self.__cache.nextoffset = remote_event.offset + 1 

933 break 

934 

935 if schema is None and not self.__hasAtLeastBeganInitialization(): 

936 msg = "Invalid initsync sequence met, ignoring" 

937 __hermes__.logger.critical(msg) 

938 return 

939 

940 # Process "standard" message 

941 match remote_event.eventtype: 

942 case "added" | "modified" | "removed": 

943 self.__processRemoteEvent( 

944 remote_event, local_event=None, enqueueEventWithError=True 

945 ) 

946 case "dataschema": 

947 schema = Dataschema.from_json(remote_event.objattrs) 

948 self.__updateSchema(schema) 

949 case _: 

950 __hermes__.logger.error( 

951 "Received an event with unknown type" 

952 f" '{remote_event.eventtype}': ignored" 

953 ) 

954 

955 self.__cache.nextoffset = remote_event.offset + 1 

956 

957 while self.__isPaused and not self.__isStopped: 

958 sleep(1) # Allow loop to be paused 

959 if self.__isStopped: 

960 break # Allow loop to be interrupted if requested 

961 

962 def __processRemoteEvent( 

963 self, 

964 remote_event: Event | None, 

965 local_event: Event | None, 

966 enqueueEventWithError: bool, 

967 simulateOnly: bool = False, 

968 ): 

969 secretAttrs = self.__datamodel.remote_schema.secretsAttributesOf( 

970 remote_event.objtype 

971 ) 

972 __hermes__.logger.debug( 

973 f"__processRemoteEvent({remote_event.toString(secretAttrs)})" 

974 ) 

975 self.__saveRequired = True 

976 

977 # In case of modification, try to compose full object in order to provide all 

978 # attributes values, in order to render Jinja Template with several vars. 

979 # In this specific case, one of the template var may have been modified, 

980 # but not the other, so the event attrs are not enough to process the 

981 # template rendering 

982 r_obj_complete: DataObject | None = None 

983 if remote_event.eventtype == "modified": 

984 cache_complete, r_cachedobj_complete = ( 

985 Datamodel.getObjectFromCacheOrTrashbin( 

986 self.__datamodel.remotedata_complete, 

987 remote_event.objtype, 

988 remote_event.objpkey, 

989 ) 

990 ) 

991 if r_cachedobj_complete is not None: 

992 r_obj_complete = Datamodel.getUpdatedObject( 

993 r_cachedobj_complete, remote_event.objattrs 

994 ) 

995 

996 # Should be always None, except when called from __retryErrorQueue() 

997 # In this case, we have to use the provided local_event, as it may contains 

998 # some extra changes stacked by autoremediation 

999 if local_event is None: 

1000 local_events = self.__datamodel.convertEventToLocal( 

1001 remote_event, r_obj_complete 

1002 ) 

1003 else: 

1004 local_events = [local_event] 

1005 

1006 trashbin = self.__datamodel.remotedata[f"trashbin_{remote_event.objtype}"] 

1007 was_in_trashbin = remote_event.objpkey in trashbin 

1008 

1009 if not simulateOnly and enqueueEventWithError: 

1010 hadErrors = self.__datamodel.errorqueue.containsObjectByEvent( 

1011 remote_event, isLocalEvent=False 

1012 ) 

1013 isParent = self.__datamodel.errorqueue.isEventAParentOfAnotherError( 

1014 remote_event, isLocalEvent=False 

1015 ) 

1016 

1017 for local_event in local_events: 

1018 if not simulateOnly and enqueueEventWithError: 

1019 if hadErrors or ( 

1020 isParent and remote_event.eventtype in self.__foreignkeys_events 

1021 ): 

1022 secretAttrs = self.__datamodel.remote_schema.secretsAttributesOf( 

1023 remote_event.objtype 

1024 ) 

1025 if hadErrors: 

1026 errorMsg = ( 

1027 "Object in remote event" 

1028 f" {remote_event.toString(secretAttrs)} already had" 

1029 " unresolved errors: appending event to error queue" 

1030 ) 

1031 else: 

1032 errorMsg = ( 

1033 "Object in remote event" 

1034 f" {remote_event.toString(secretAttrs)} is a dependency of" 

1035 " an object that already had unresolved errors: appending" 

1036 " event to error queue" 

1037 ) 

1038 __hermes__.logger.warning(errorMsg) 

1039 self.__processRemoteEvent( 

1040 remote_event, 

1041 local_event=None, 

1042 enqueueEventWithError=False, 

1043 simulateOnly=True, 

1044 ) 

1045 if local_event is None: 

1046 # Force empty event generation when local_event doesn't change 

1047 # anything 

1048 l_events = self.__datamodel.convertEventToLocal( 

1049 remote_event, r_obj_complete, allowEmptyEvent=True 

1050 ) 

1051 for local_event in l_events: 

1052 self.__datamodel.errorqueue.append( 

1053 remote_event, local_event, errorMsg 

1054 ) 

1055 else: 

1056 self.__datamodel.errorqueue.append( 

1057 remote_event, local_event, errorMsg 

1058 ) 

1059 continue 

1060 

1061 try: 

1062 match remote_event.eventtype: 

1063 case "added": 

1064 if self.__trashbin_retention is not None and was_in_trashbin: 

1065 # Object is in trashbin, recycle it 

1066 self.__remoteRecycled( 

1067 remote_event, local_event, simulateOnly 

1068 ) 

1069 else: 

1070 # Add new object 

1071 self.__remoteAdded(remote_event, local_event, simulateOnly) 

1072 

1073 case "modified": 

1074 self.__remoteModified(remote_event, local_event, simulateOnly) 

1075 

1076 case "removed": 

1077 # Remove object on any of these conditions: 

1078 # - trashbin retention is disabled 

1079 # - object is already in trashbin 

1080 if self.__trashbin_retention is None or was_in_trashbin: 

1081 # Remove object 

1082 self.__remoteRemoved( 

1083 remote_event, local_event, simulateOnly 

1084 ) 

1085 else: 

1086 # Store object in trashbin 

1087 self.__remoteTrashed( 

1088 remote_event, local_event, simulateOnly 

1089 ) 

1090 except HermesClientHandlerError as e: 

1091 if not simulateOnly and enqueueEventWithError: 

1092 self.__processRemoteEvent( 

1093 remote_event, 

1094 local_event=None, 

1095 enqueueEventWithError=False, 

1096 simulateOnly=True, 

1097 ) 

1098 remote_event.step = self.currentStep 

1099 remote_event.isPartiallyProcessed = self.isPartiallyProcessed 

1100 local_event.step = self.currentStep 

1101 local_event.isPartiallyProcessed = self.isPartiallyProcessed 

1102 

1103 if local_event is None: 

1104 # Force empty event generation when local_event doesn't change 

1105 # anything 

1106 l_events = self.__datamodel.convertEventToLocal( 

1107 remote_event, r_obj_complete, allowEmptyEvent=True 

1108 ) 

1109 for local_event in l_events: 

1110 self.__datamodel.errorqueue.append( 

1111 remote_event, local_event, e.msg 

1112 ) 

1113 else: 

1114 self.__datamodel.errorqueue.append( 

1115 remote_event, local_event, e.msg 

1116 ) 

1117 else: 

1118 raise 

1119 

1120 def __processLocalEvent( 

1121 self, 

1122 remote_event: Event | None, 

1123 local_event: Event | None, 

1124 enqueueEventWithError: bool, 

1125 simulateOnly: bool = False, 

1126 ): 

1127 if local_event is None: 

1128 __hermes__.logger.debug("__processLocalEvent(None)") 

1129 return 

1130 

1131 secretAttrs = self.__datamodel.local_schema.secretsAttributesOf( 

1132 local_event.objtype 

1133 ) 

1134 __hermes__.logger.debug( 

1135 f"__processLocalEvent({local_event.toString(secretAttrs)})" 

1136 ) 

1137 

1138 self.__saveRequired = True 

1139 

1140 if not simulateOnly: 

1141 # Reset current step 

1142 self.currentStep = local_event.step 

1143 self.isPartiallyProcessed = local_event.isPartiallyProcessed 

1144 

1145 if not simulateOnly and enqueueEventWithError: 

1146 hadErrors = self.__datamodel.errorqueue.containsObjectByEvent( 

1147 local_event, isLocalEvent=True 

1148 ) 

1149 isParent = self.__datamodel.errorqueue.isEventAParentOfAnotherError( 

1150 local_event, isLocalEvent=True 

1151 ) 

1152 if hadErrors or ( 

1153 isParent and local_event.eventtype in self.__foreignkeys_events 

1154 ): 

1155 secretAttrs = self.__datamodel.local_schema.secretsAttributesOf( 

1156 local_event.objtype 

1157 ) 

1158 if hadErrors: 

1159 errorMsg = ( 

1160 f"Object in local event {local_event.toString(secretAttrs)}" 

1161 " already had unresolved errors: appending event to error queue" 

1162 ) 

1163 else: 

1164 errorMsg = ( 

1165 f"Object in local event {local_event.toString(secretAttrs)}" 

1166 " is a dependency of an object that already had unresolved" 

1167 " errors: appending event to error queue" 

1168 ) 

1169 __hermes__.logger.warning(errorMsg) 

1170 self.__processLocalEvent( 

1171 None, local_event, enqueueEventWithError=False, simulateOnly=True 

1172 ) 

1173 self.__datamodel.errorqueue.append(remote_event, local_event, errorMsg) 

1174 return 

1175 

1176 trashbin = self.__datamodel.localdata[f"trashbin_{local_event.objtype}"] 

1177 try: 

1178 match local_event.eventtype: 

1179 case "added": 

1180 if ( 

1181 self.__trashbin_retention is not None 

1182 and local_event.objpkey in trashbin 

1183 ): 

1184 # Object is in trashbin, recycle it 

1185 self.__localRecycled(local_event, simulateOnly) 

1186 else: 

1187 self.__localAdded(local_event, simulateOnly) 

1188 case "modified": 

1189 if not simulateOnly and local_event.objpkey in trashbin: 

1190 # Object is in trashbin, and cannot be modified until it is 

1191 # restored 

1192 if enqueueEventWithError: 

1193 # As the object changes will be processed at restore, 

1194 # ignore the change 

1195 self.__processLocalEvent( 

1196 None, 

1197 local_event, 

1198 enqueueEventWithError=False, 

1199 simulateOnly=True, 

1200 ) 

1201 else: 

1202 # Propagate error as requested 

1203 raise HermesClientHandlerError( 

1204 f"Object of event {repr(local_event)} is in trashbin," 

1205 " and cannot be modified until it is restored" 

1206 ) 

1207 else: 

1208 self.__localModified(local_event, simulateOnly) 

1209 case "removed": 

1210 # Remove object on any of these conditions: 

1211 # - trashbin retention is disabled 

1212 # - object is already in trashbin 

1213 if ( 

1214 self.__trashbin_retention is None 

1215 or local_event.objpkey in trashbin 

1216 ): 

1217 self.__localRemoved(local_event, simulateOnly) 

1218 else: 

1219 self.__localTrashed(local_event, simulateOnly) 

1220 except HermesClientHandlerError as e: 

1221 if not simulateOnly and enqueueEventWithError: 

1222 self.__processLocalEvent( 

1223 None, local_event, enqueueEventWithError=False, simulateOnly=True 

1224 ) 

1225 if remote_event is not None: 

1226 remote_event.step = self.currentStep 

1227 remote_event.isPartiallyProcessed = self.isPartiallyProcessed 

1228 local_event.step = self.currentStep 

1229 local_event.isPartiallyProcessed = self.isPartiallyProcessed 

1230 self.__datamodel.errorqueue.append(remote_event, local_event, e.msg) 

1231 else: 

1232 raise 

1233 

1234 def __remoteAdded( 

1235 self, remote_event: Event, local_event: Event, simulateOnly: bool = False 

1236 ): 

1237 secretAttrs = self.__datamodel.remote_schema.secretsAttributesOf( 

1238 remote_event.objtype 

1239 ) 

1240 __hermes__.logger.debug(f"__remoteAdded({remote_event.toString(secretAttrs)})") 

1241 

1242 r_obj = self.__datamodel.createRemoteDataobject( 

1243 remote_event.objtype, remote_event.objattrs 

1244 ) 

1245 self.__processLocalEvent( 

1246 remote_event, 

1247 local_event, 

1248 enqueueEventWithError=False, 

1249 simulateOnly=simulateOnly, 

1250 ) 

1251 

1252 # Add remote object to cache 

1253 if not simulateOnly: 

1254 # May already been added if remote type is used in more than one local type 

1255 self.__datamodel.remotedata[remote_event.objtype].append( 

1256 r_obj, ignoreIfAlreadyPresent=True 

1257 ) 

1258 # May already been added if current event is from errorqueue 

1259 self.__datamodel.remotedata_complete[remote_event.objtype].append( 

1260 r_obj, ignoreIfAlreadyPresent=True 

1261 ) 

1262 

1263 def __localAdded(self, local_ev: Event, simulateOnly: bool = False): 

1264 secretAttrs = self.__datamodel.local_schema.secretsAttributesOf( 

1265 local_ev.objtype 

1266 ) 

1267 __hermes__.logger.debug(f"__localAdded({local_ev.toString(secretAttrs)})") 

1268 

1269 l_obj = self.__datamodel.createLocalDataobject( 

1270 local_ev.objtype, local_ev.objattrs 

1271 ) 

1272 

1273 if not simulateOnly: 

1274 # Call added handler 

1275 self.__callHandler( 

1276 objtype=local_ev.objtype, 

1277 eventtype="added", 

1278 objkey=local_ev.objpkey, 

1279 eventattrs=local_ev.objattrs, 

1280 newobj=deepcopy(l_obj), 

1281 ) 

1282 

1283 # Add local object to cache 

1284 if not simulateOnly: 

1285 self.__datamodel.localdata[local_ev.objtype].append( 

1286 l_obj, ignoreIfAlreadyPresent=True 

1287 ) 

1288 # May already been added if current event is from errorqueue 

1289 self.__datamodel.localdata_complete[local_ev.objtype].append( 

1290 l_obj, ignoreIfAlreadyPresent=True 

1291 ) 

1292 

1293 def __remoteRecycled( 

1294 self, remote_event: Event, local_event: Event, simulateOnly: bool = False 

1295 ): 

1296 secretAttrs = self.__datamodel.remote_schema.secretsAttributesOf( 

1297 remote_event.objtype 

1298 ) 

1299 __hermes__.logger.debug( 

1300 f"__remoteRecycled({remote_event.toString(secretAttrs)})" 

1301 ) 

1302 maincache = self.__datamodel.remotedata[remote_event.objtype] 

1303 maincache_complete = self.__datamodel.remotedata_complete[remote_event.objtype] 

1304 trashbin = self.__datamodel.remotedata[f"trashbin_{remote_event.objtype}"] 

1305 trashbin_complete = self.__datamodel.remotedata_complete[ 

1306 f"trashbin_{remote_event.objtype}" 

1307 ] 

1308 

1309 r_obj = self.__datamodel.createRemoteDataobject( 

1310 remote_event.objtype, remote_event.objattrs 

1311 ) 

1312 

1313 self.__processLocalEvent( 

1314 remote_event, 

1315 local_event, 

1316 enqueueEventWithError=False, 

1317 simulateOnly=simulateOnly, 

1318 ) 

1319 

1320 # Remove remote object from trashbin 

1321 if not simulateOnly: 

1322 trashbin.removeByPkey(remote_event.objpkey) 

1323 trashbin_complete.removeByPkey(remote_event.objpkey) 

1324 # Restore remote object, with its potential changes, in main cache 

1325 if not simulateOnly: 

1326 # May already been added if remote type is used in more than one local type 

1327 maincache.append(r_obj, ignoreIfAlreadyPresent=True) 

1328 # May already been recycled if current event is from errorqueue 

1329 maincache_complete.append(r_obj, ignoreIfAlreadyPresent=True) 

1330 

1331 def __localRecycled(self, local_ev: Event, simulateOnly: bool = False): 

1332 secretAttrs = self.__datamodel.local_schema.secretsAttributesOf( 

1333 local_ev.objtype 

1334 ) 

1335 __hermes__.logger.debug(f"__localRecycled({local_ev.toString(secretAttrs)})") 

1336 maincache = self.__datamodel.localdata[local_ev.objtype] 

1337 maincache_complete = self.__datamodel.localdata_complete[local_ev.objtype] 

1338 trashbin = self.__datamodel.localdata[f"trashbin_{local_ev.objtype}"] 

1339 trashbin_complete = self.__datamodel.localdata_complete[ 

1340 f"trashbin_{local_ev.objtype}" 

1341 ] 

1342 

1343 l_obj = self.__datamodel.createLocalDataobject( 

1344 local_ev.objtype, local_ev.objattrs 

1345 ) 

1346 l_obj_trash: DataObject = deepcopy(trashbin.get(local_ev.objpkey)) 

1347 del l_obj_trash._trashbin_timestamp # Remove trashbin timestamp from object 

1348 

1349 l_obj_trash_complete: DataObject = deepcopy( 

1350 trashbin_complete.get(local_ev.objpkey) 

1351 ) 

1352 

1353 # May already been recycled if current event is from errorqueue 

1354 if l_obj_trash_complete is not None: 

1355 del ( 

1356 l_obj_trash_complete._trashbin_timestamp 

1357 ) # Remove trashbin timestamp from object 

1358 

1359 if not simulateOnly: 

1360 # Call recycled handler 

1361 self.__callHandler( 

1362 objtype=local_ev.objtype, 

1363 eventtype="recycled", 

1364 objkey=l_obj_trash.getPKey(), 

1365 eventattrs=l_obj_trash.toNative(), 

1366 newobj=deepcopy(l_obj_trash), 

1367 ) 

1368 

1369 if not simulateOnly: 

1370 # Remove local object from trashbin 

1371 trashbin.remove(l_obj_trash) 

1372 # Restore local object in main cache 

1373 maincache.append(l_obj_trash, ignoreIfAlreadyPresent=True) 

1374 

1375 # May already been recycled if current event is from errorqueue 

1376 if l_obj_trash_complete is not None: 

1377 trashbin_complete.remove(l_obj_trash_complete) 

1378 maincache_complete.append(l_obj_trash_complete, ignoreIfAlreadyPresent=True) 

1379 

1380 diff = l_obj.diffFrom(l_obj_trash) # Handle local object changes if any 

1381 if diff and not simulateOnly: 

1382 event, obj = Event.fromDiffItem( 

1383 diffitem=diff, 

1384 eventCategory=local_ev.evcategory, 

1385 changeType="modified", 

1386 ) 

1387 # Hack: we pass this second event (modified) to error queue in order to 

1388 # postpone its processing once all caches of previous one are up to date. 

1389 # Otherwise, if an error is met on this second event (modified), we'll try 

1390 # to reprocess the first one (recycled) 

1391 self.__datamodel.errorqueue.append( 

1392 remoteEvent=None, localEvent=event, errorMsg=None 

1393 ) 

1394 # ... and force error queue to be retried asap in order to process the 

1395 # pending event 

1396 self.__errorQueue_lastretry = datetime(year=1, month=1, day=1) 

1397 

1398 def __remoteModified( 

1399 self, remote_event: Event, local_event: Event, simulateOnly: bool = False 

1400 ): 

1401 secretAttrs = self.__datamodel.remote_schema.secretsAttributesOf( 

1402 remote_event.objtype 

1403 ) 

1404 __hermes__.logger.debug( 

1405 f"__remoteModified({remote_event.toString(secretAttrs)})" 

1406 ) 

1407 maincache = self.__datamodel.remotedata[remote_event.objtype] 

1408 

1409 if not simulateOnly: 

1410 r_cachedobj: DataObject = maincache.get(remote_event.objpkey) 

1411 r_obj = Datamodel.getUpdatedObject(r_cachedobj, remote_event.objattrs) 

1412 

1413 cache_complete, r_cachedobj_complete = Datamodel.getObjectFromCacheOrTrashbin( 

1414 self.__datamodel.remotedata_complete, 

1415 remote_event.objtype, 

1416 remote_event.objpkey, 

1417 ) 

1418 r_obj_complete = Datamodel.getUpdatedObject( 

1419 r_cachedobj_complete, remote_event.objattrs 

1420 ) 

1421 

1422 self.__processLocalEvent( 

1423 remote_event, 

1424 local_event, 

1425 enqueueEventWithError=False, 

1426 simulateOnly=simulateOnly, 

1427 ) 

1428 

1429 # Update remote object in cache 

1430 if not simulateOnly: 

1431 maincache.replace(r_obj) 

1432 

1433 # May not exist 

1434 if cache_complete is not None: 

1435 cache_complete.replace(r_obj_complete) 

1436 

1437 def __localModified(self, local_ev: Event, simulateOnly: bool = False): 

1438 secretAttrs = self.__datamodel.local_schema.secretsAttributesOf( 

1439 local_ev.objtype 

1440 ) 

1441 __hermes__.logger.debug(f"__localModified({local_ev.toString(secretAttrs)})") 

1442 maincache = self.__datamodel.localdata[local_ev.objtype] 

1443 

1444 if not simulateOnly: 

1445 l_cachedobj: DataObject = maincache.get(local_ev.objpkey) 

1446 l_obj = Datamodel.getUpdatedObject(l_cachedobj, local_ev.objattrs) 

1447 

1448 cache_complete, l_cachedobj_complete = Datamodel.getObjectFromCacheOrTrashbin( 

1449 self.__datamodel.localdata_complete, local_ev.objtype, local_ev.objpkey 

1450 ) 

1451 

1452 # May not exist 

1453 if cache_complete is not None: 

1454 l_obj_complete = Datamodel.getUpdatedObject( 

1455 l_cachedobj_complete, local_ev.objattrs 

1456 ) 

1457 

1458 if not simulateOnly: 

1459 # Call modified handler 

1460 self.__callHandler( 

1461 objtype=local_ev.objtype, 

1462 eventtype="modified", 

1463 objkey=local_ev.objpkey, 

1464 eventattrs=local_ev.objattrs, 

1465 newobj=deepcopy(l_obj), 

1466 cachedobj=deepcopy(l_cachedobj), 

1467 ) 

1468 

1469 # Update local object in cache 

1470 if not simulateOnly: 

1471 maincache.replace(l_obj) 

1472 

1473 # May not exist 

1474 if cache_complete is not None: 

1475 cache_complete.replace(l_obj_complete) 

1476 

1477 def __remoteTrashed( 

1478 self, remote_event: Event, local_event: Event, simulateOnly: bool = False 

1479 ): 

1480 secretAttrs = self.__datamodel.remote_schema.secretsAttributesOf( 

1481 remote_event.objtype 

1482 ) 

1483 __hermes__.logger.debug( 

1484 f"__remoteTrashed({remote_event.toString(secretAttrs)})" 

1485 ) 

1486 maincache = self.__datamodel.remotedata[remote_event.objtype] 

1487 maincache_complete = self.__datamodel.remotedata_complete[remote_event.objtype] 

1488 trashbin = self.__datamodel.remotedata[f"trashbin_{remote_event.objtype}"] 

1489 trashbin_complete = self.__datamodel.remotedata_complete[ 

1490 f"trashbin_{remote_event.objtype}" 

1491 ] 

1492 

1493 r_cachedobj: DataObject = maincache.get(remote_event.objpkey) 

1494 r_cachedobj_complete: DataObject = maincache_complete.get(remote_event.objpkey) 

1495 

1496 self.__processLocalEvent( 

1497 remote_event, 

1498 local_event, 

1499 enqueueEventWithError=False, 

1500 simulateOnly=simulateOnly, 

1501 ) 

1502 

1503 if not simulateOnly: 

1504 # Remove remote object from cache 

1505 # May already been removed if remote type is used in more than one local 

1506 # type 

1507 if r_cachedobj is not None: 

1508 maincache.remove(r_cachedobj) 

1509 r_cachedobj._trashbin_timestamp = remote_event.timestamp 

1510 # Add remote object to trashbin 

1511 # May already been added if remote type is used in more than one local 

1512 # type 

1513 trashbin.append(r_cachedobj, ignoreIfAlreadyPresent=True) 

1514 

1515 if r_cachedobj_complete is not None: 

1516 maincache_complete.remove(r_cachedobj_complete) 

1517 r_cachedobj_complete._trashbin_timestamp = remote_event.timestamp 

1518 # May already been added if remote type is used in more than one local type 

1519 trashbin_complete.append(r_cachedobj_complete, ignoreIfAlreadyPresent=True) 

1520 

1521 def __localTrashed(self, local_ev: Event, simulateOnly: bool = False): 

1522 secretAttrs = self.__datamodel.local_schema.secretsAttributesOf( 

1523 local_ev.objtype 

1524 ) 

1525 __hermes__.logger.debug(f"__localTrashed({local_ev.toString(secretAttrs)})") 

1526 maincache = self.__datamodel.localdata[local_ev.objtype] 

1527 maincache_complete = self.__datamodel.localdata_complete[local_ev.objtype] 

1528 trashbin = self.__datamodel.localdata[f"trashbin_{local_ev.objtype}"] 

1529 trashbin_complete = self.__datamodel.localdata_complete[ 

1530 f"trashbin_{local_ev.objtype}" 

1531 ] 

1532 

1533 l_cachedobj: DataObject = maincache.get(local_ev.objpkey) 

1534 l_cachedobj_complete: DataObject = maincache_complete.get(local_ev.objpkey) 

1535 

1536 if not simulateOnly: 

1537 # Call trashed handler 

1538 self.__callHandler( 

1539 objtype=local_ev.objtype, 

1540 eventtype="trashed", 

1541 objkey=local_ev.objpkey, 

1542 eventattrs=local_ev.objattrs, 

1543 cachedobj=deepcopy(l_cachedobj), 

1544 ) 

1545 

1546 if not simulateOnly: 

1547 # Remove local object from cache 

1548 maincache.remove(l_cachedobj) 

1549 l_cachedobj._trashbin_timestamp = local_ev.timestamp 

1550 # Add local object to trashbin 

1551 trashbin.append(l_cachedobj, ignoreIfAlreadyPresent=True) 

1552 

1553 if l_cachedobj_complete is not None: 

1554 maincache_complete.remove(l_cachedobj_complete) 

1555 l_cachedobj_complete._trashbin_timestamp = local_ev.timestamp 

1556 trashbin_complete.append(l_cachedobj_complete, ignoreIfAlreadyPresent=True) 

1557 

1558 def __remoteRemoved( 

1559 self, remote_event: Event, local_event: Event, simulateOnly: bool = False 

1560 ): 

1561 secretAttrs = self.__datamodel.remote_schema.secretsAttributesOf( 

1562 remote_event.objtype 

1563 ) 

1564 __hermes__.logger.debug( 

1565 f"__remoteRemoved({remote_event.toString(secretAttrs)})" 

1566 ) 

1567 

1568 cache, r_cachedobj = Datamodel.getObjectFromCacheOrTrashbin( 

1569 self.__datamodel.remotedata, 

1570 remote_event.objtype, 

1571 remote_event.objpkey, 

1572 ) 

1573 

1574 cache_complete, r_cachedobj_complete = Datamodel.getObjectFromCacheOrTrashbin( 

1575 self.__datamodel.remotedata_complete, 

1576 remote_event.objtype, 

1577 remote_event.objpkey, 

1578 ) 

1579 

1580 self.__processLocalEvent( 

1581 remote_event, 

1582 local_event, 

1583 enqueueEventWithError=False, 

1584 simulateOnly=simulateOnly, 

1585 ) 

1586 

1587 # Remove remote object from cache or trashbin 

1588 # May already been removed if remote type is used in more than one local type 

1589 if not simulateOnly: 

1590 if r_cachedobj is not None: 

1591 cache.remove(r_cachedobj) 

1592 

1593 # May already been removed if current event is from errorqueue 

1594 if cache_complete is not None: 

1595 # May already been removed if remote type is used in more than one local 

1596 # type 

1597 if r_cachedobj_complete is not None: 

1598 cache_complete.remove(r_cachedobj_complete) 

1599 

1600 def __localRemoved(self, local_ev: Event, simulateOnly: bool = False): 

1601 secretAttrs = self.__datamodel.local_schema.secretsAttributesOf( 

1602 local_ev.objtype 

1603 ) 

1604 __hermes__.logger.debug(f"__localRemoved({local_ev.toString(secretAttrs)})") 

1605 

1606 cache, l_cachedobj = Datamodel.getObjectFromCacheOrTrashbin( 

1607 self.__datamodel.localdata, 

1608 local_ev.objtype, 

1609 local_ev.objpkey, 

1610 ) 

1611 

1612 cache_complete, l_cachedobj_complete = Datamodel.getObjectFromCacheOrTrashbin( 

1613 self.__datamodel.localdata_complete, 

1614 local_ev.objtype, 

1615 local_ev.objpkey, 

1616 ) 

1617 

1618 if not simulateOnly: 

1619 # Call removed handler 

1620 self.__callHandler( 

1621 objtype=local_ev.objtype, 

1622 eventtype="removed", 

1623 objkey=local_ev.objpkey, 

1624 eventattrs=local_ev.objattrs, 

1625 cachedobj=deepcopy(l_cachedobj), 

1626 ) 

1627 

1628 # Remove local object from cache or trashbin 

1629 if not simulateOnly: 

1630 cache.remove(l_cachedobj) 

1631 # May already been removed if current event is from errorqueue 

1632 if cache_complete is not None: 

1633 cache_complete.remove(l_cachedobj_complete) 

1634 

1635 def __callHandler(self, objtype: str, eventtype: str, **kwargs): 

1636 if not objtype: 

1637 handlerName = f"on_{eventtype}" 

1638 kwargs_filtered = kwargs 

1639 else: 

1640 handlerName = f"on_{objtype}_{eventtype}" 

1641 

1642 # Filter secrets values 

1643 secretAttrs = self.__datamodel.local_schema.secretsAttributesOf(objtype) 

1644 kwargs_filtered = kwargs.copy() 

1645 kwargs_filtered["eventattrs"] = Event.objattrsToString( 

1646 kwargs["eventattrs"], secretAttrs 

1647 ) 

1648 

1649 kwargsstr = ", ".join([f"{k}={repr(v)}" for k, v in kwargs_filtered.items()]) 

1650 

1651 hdlr = getattr(self, handlerName, None) 

1652 

1653 if not callable(hdlr): 

1654 __hermes__.logger.debug( 

1655 f"Calling '{handlerName}({kwargsstr})': handler '{handlerName}()'" 

1656 " doesn't exists" 

1657 ) 

1658 return 

1659 

1660 __hermes__.logger.info( 

1661 f"Calling '{handlerName}({kwargsstr})' - currentStep={self.currentStep}," 

1662 f" isPartiallyProcessed={self.isPartiallyProcessed}," 

1663 f" isAnErrorRetry={self.isAnErrorRetry}" 

1664 ) 

1665 

1666 try: 

1667 hdlr(**kwargs) 

1668 except Exception as e: 

1669 __hermes__.logger.error( 

1670 f"Calling '{handlerName}({kwargsstr})': error met on step" 

1671 f" {self.currentStep} '{str(e)}'" 

1672 ) 

1673 raise HermesClientHandlerError(e) 

1674 

1675 def __processDatamodelUpdate(self): 

1676 """Check difference between current datamodel and previous one. If datamodel has 

1677 changed, generate local events according to datamodel changes""" 

1678 

1679 diff = self.__newdatamodel.diffFrom(self.__datamodel) 

1680 

1681 if not diff: 

1682 __hermes__.logger.info("No change in datamodel") 

1683 # Start working with new datamodel 

1684 self.__datamodel = self.__newdatamodel 

1685 self.__datamodel.loadErrorQueue() 

1686 return 

1687 

1688 __hermes__.logger.info(f"Datamodel has changed: {diff.dict}") 

1689 self.__saveRequired = True 

1690 

1691 # Start by removed types, as it requires the previous datamodel to process data 

1692 # removal 

1693 if diff.removed: 

1694 for l_objtype in diff.removed: 

1695 __hermes__.logger.info(f"About to purge data from type '{l_objtype}'") 

1696 for ltypes in self.__datamodel.typesmapping.values(): 

1697 if l_objtype in ltypes: 

1698 break 

1699 else: 

1700 # l_objtype was not found in each list from 

1701 # self.__datamodel.typesmapping.values() 

1702 __hermes__.logger.warning( 

1703 f"Requested to purge data from type '{l_objtype}', but it" 

1704 " doesn't exist in previous datamodel: ignoring" 

1705 ) 

1706 continue 

1707 

1708 pkeys = ( 

1709 self.__datamodel.localdata[l_objtype].getPKeys() 

1710 | self.__datamodel.localdata[f"trashbin_{l_objtype}"].getPKeys() 

1711 ) 

1712 

1713 # Call remove on each object 

1714 for pkey in pkeys: 

1715 _, l_obj = Datamodel.getObjectFromCacheOrTrashbin( 

1716 self.__datamodel.localdata, l_objtype, pkey 

1717 ) 

1718 if l_obj: 

1719 l_ev = Event( 

1720 evcategory="base", 

1721 eventtype="removed", 

1722 obj=l_obj, 

1723 objattrs={}, 

1724 ) 

1725 secretAttrs = self.__datamodel.local_schema.secretsAttributesOf( 

1726 l_objtype 

1727 ) 

1728 __hermes__.logger.debug( 

1729 f"Removing local object of {pkey=}:" 

1730 f" {l_ev.toString(secretAttrs)=}" 

1731 ) 

1732 self.__localRemoved(l_ev) 

1733 else: 

1734 __hermes__.logger.error(f"Local object of {pkey=} not found") 

1735 

1736 # All objects have been removed, remove remaining events from 

1737 # errorqueue, if any 

1738 for l_obj in self.__datamodel.localdata_complete[l_objtype]: 

1739 # Remove eventual events relative to current object from error queue 

1740 self.__datamodel.errorqueue.purgeAllEventsOfDataObject( 

1741 l_obj, isLocalObjtype=True 

1742 ) 

1743 

1744 self.__datamodel.saveLocalAndRemoteData() # Save changes 

1745 

1746 # Purge old remote and local cache files, if any 

1747 __hermes__.logger.info( 

1748 f"Types removed from Datamodel: {diff.removed}, purging cache files" 

1749 ) 

1750 Datamodel.purgeOldCacheFiles(diff.removed, cacheFilePrefix="__") 

1751 

1752 # Start working with new datamodel 

1753 self.__datamodel = self.__newdatamodel 

1754 self.__datamodel.loadLocalAndRemoteData() 

1755 

1756 # Reload error queue to allow it to handle new datamodel 

1757 self.__datamodel.saveErrorQueue() 

1758 self.__datamodel.loadErrorQueue() 

1759 

1760 if diff.added or diff.modified: 

1761 # Generate diff events according to datamodel changes 

1762 new_local_data: dict[str, DataObjectList] = {} 

1763 

1764 # Work on "complete" copy of data cache, representing the cache that should 

1765 # be without any event in error queue in order to compute diff on complete 

1766 # cache representation 

1767 completeRemoteData = self.__datamodel.remotedata_complete 

1768 completeLocalData = self.__datamodel.localdata_complete 

1769 

1770 # Loop over each remote type 

1771 for r_objtype in self.__datamodel.remote_schema.objectTypes: 

1772 # For each type, we'll work on classic data, and on trashbin data 

1773 for prefix in ("", "trashbin_"): 

1774 if r_objtype not in self.__datamodel.typesmapping: 

1775 continue # Remote objtype isn't set in current Datamodel 

1776 

1777 # Fetch corresponding local type 

1778 for l_objt in self.__datamodel.typesmapping[r_objtype]: 

1779 l_objtype = f"{prefix}{l_objt}" 

1780 

1781 # Convert remote data cache to local data 

1782 new_local_data[l_objtype] = ( 

1783 self.__datamodel.convertDataObjectListToLocal( 

1784 r_objtype, 

1785 completeRemoteData[f"{prefix}{r_objtype}"], 

1786 l_objt, 

1787 ) 

1788 ) 

1789 

1790 # Compute differences between new local data and local data 

1791 # cache 

1792 completeLocalDataObjtype: DataObjectList = ( 

1793 completeLocalData.get(l_objtype, DataObjectList([])) 

1794 ) 

1795 datadiff = new_local_data[l_objtype].diffFrom( 

1796 completeLocalDataObjtype 

1797 ) 

1798 

1799 for changeType, difflist in datadiff.dict.items(): 

1800 diffitem: DiffObject | DataObject 

1801 for diffitem in difflist: 

1802 # Convert diffitem to local Event 

1803 event, obj = Event.fromDiffItem( 

1804 diffitem=diffitem, 

1805 eventCategory="base", 

1806 changeType=changeType, 

1807 ) 

1808 

1809 if prefix == "trashbin_": 

1810 if obj not in completeLocalDataObjtype: 

1811 # Object exists in remote trashbin, but not in 

1812 # local one as it has been removed before its 

1813 # type was added to client's Datamodel. 

1814 # Process a local "added" event, then a local 

1815 # "removed" event to store local object in 

1816 # trashbin 

1817 

1818 # Add local object 

1819 self.__processLocalEvent( 

1820 None, event, enqueueEventWithError=True 

1821 ) 

1822 

1823 # Prepare "removed" event 

1824 event = Event( 

1825 evcategory="base", 

1826 eventtype="removed", 

1827 obj=obj, 

1828 objattrs={}, 

1829 ) 

1830 # Preserve object _trashbin_timestamp 

1831 event.timestamp = completeRemoteData[ 

1832 f"{prefix}{r_objtype}" 

1833 ][obj]._trashbin_timestamp 

1834 else: 

1835 # Preserve object _trashbin_timestamp 

1836 obj._trashbin_timestamp = ( 

1837 completeLocalDataObjtype[ 

1838 obj 

1839 ]._trashbin_timestamp 

1840 ) 

1841 

1842 # Process Event and update cache if no error is met, 

1843 # enqueue event otherwise 

1844 self.__processLocalEvent( 

1845 None, event, enqueueEventWithError=True 

1846 ) 

1847 

1848 self.__config.savecachefile() # Save config to be able to rebuild datamodel 

1849 self.__datamodel.saveLocalAndRemoteData() # Save data 

1850 self.__checkDatamodelWarnings() 

1851 

1852 def __status( 

1853 self, verbose=False, level="information", ignoreUnhandledExceptions=False 

1854 ) -> dict[str, dict[str, dict[str, Any]]]: 

1855 """Returns a dict containing status for current client Datamodel and error 

1856 queue. 

1857 

1858 Each status contains 3 categories/levels: "information", "warning" and "error" 

1859 """ 

1860 if level not in ("information", "warning", "error"): 

1861 raise AttributeError( 

1862 f"Specified level '{level}' is invalid. Possible values are" 

1863 """("information", "warning", "error"):""" 

1864 ) 

1865 

1866 appname: str = self.__config["appname"] 

1867 

1868 match level: 

1869 case "error": 

1870 levels = ["error"] 

1871 case "warning": 

1872 levels = [ 

1873 "warning", 

1874 "error", 

1875 ] 

1876 case "information": 

1877 levels = [ 

1878 "information", 

1879 "warning", 

1880 "error", 

1881 ] 

1882 

1883 res = { 

1884 appname: { 

1885 "information": { 

1886 "startTime": self.__startTime.strftime("%Y-%m-%d %H:%M:%S"), 

1887 "status": "paused" if self.__isPaused else "running", 

1888 "pausedSince": ( 

1889 self.__isPaused.strftime("%Y-%m-%d %H:%M:%S") 

1890 if self.__isPaused 

1891 else "None" 

1892 ), 

1893 }, 

1894 "warning": {}, 

1895 "error": {}, 

1896 }, 

1897 } 

1898 if not ignoreUnhandledExceptions and self.__cache.exception: 

1899 res[appname]["error"]["unhandledException"] = self.__cache.exception 

1900 

1901 # Datamodel 

1902 res["datamodel"] = { 

1903 "information": {}, 

1904 "warning": {}, 

1905 "error": {}, 

1906 } 

1907 if self.__datamodel.unknownRemoteTypes: 

1908 res["datamodel"]["warning"]["unknownRemoteTypes"] = sorted( 

1909 self.__datamodel.unknownRemoteTypes 

1910 ) 

1911 if self.__datamodel.unknownRemoteAttributes: 

1912 res["datamodel"]["warning"]["unknownRemoteAttributes"] = { 

1913 k: sorted(v) 

1914 for k, v in self.__datamodel.unknownRemoteAttributes.items() 

1915 } 

1916 

1917 # Error queue 

1918 res["errorQueue"] = { 

1919 "information": {}, 

1920 "warning": {}, 

1921 "error": {}, 

1922 } 

1923 

1924 if self.__datamodel.errorqueue is not None: 

1925 eventNumber: int 

1926 remoteEvent: Event | None 

1927 localEvent: Event 

1928 errorMsg: str 

1929 for ( 

1930 eventNumber, 

1931 remoteEvent, 

1932 localEvent, 

1933 errorMsg, 

1934 ) in self.__datamodel.errorqueue: 

1935 if errorMsg is None: 

1936 # Ignore the events in queue that are not errors 

1937 continue 

1938 

1939 # Always try to get object from local cache in order to use configured 

1940 # toString template for obj repr() 

1941 objtype = localEvent.objtype 

1942 

1943 _, obj = Datamodel.getObjectFromCacheOrTrashbin( 

1944 self.__datamodel.localdata_complete, objtype, localEvent.objpkey 

1945 ) 

1946 if obj is None: 

1947 _, obj = Datamodel.getObjectFromCacheOrTrashbin( 

1948 self.__datamodel.localdata, objtype, localEvent.objpkey 

1949 ) 

1950 if obj is None and remoteEvent is not None: 

1951 _, obj = Datamodel.getObjectFromCacheOrTrashbin( 

1952 self.__datamodel.remotedata_complete, 

1953 remoteEvent.objtype, 

1954 remoteEvent.objpkey, 

1955 ) 

1956 if obj is None: 

1957 obj = f"<{localEvent.objtype}[{localEvent.objpkey}]>" 

1958 else: 

1959 obj = repr(obj) 

1960 

1961 res["errorQueue"]["error"][eventNumber] = { 

1962 "objrepr": obj, 

1963 "errorMsg": errorMsg, 

1964 } 

1965 if verbose: 

1966 res["errorQueue"]["error"][eventNumber] |= { 

1967 "objtype": localEvent.objtype, 

1968 "objpkey": localEvent.objpkey, 

1969 "objattrs": localEvent.objattrs, 

1970 } 

1971 

1972 # Clean empty categories 

1973 for objname in list(res.keys()): 

1974 for category in ("information", "warning", "error"): 

1975 if category not in levels or ( 

1976 not verbose and not res[objname][category] 

1977 ): 

1978 del res[objname][category] 

1979 

1980 if not verbose and not res[objname]: 

1981 del res[objname] 

1982 

1983 return res 

1984 

1985 def __notifyQueueErrors(self): 

1986 """Notify of any objects change in error queue""" 

1987 new_error = self.__status(level="error", ignoreUnhandledExceptions=True) 

1988 

1989 new_errors = {} 

1990 if "errorQueue" in new_error: 

1991 for errNumber, err in new_error["errorQueue"]["error"].items(): 

1992 new_errors[errNumber] = f"{err['objrepr']}: {err['errorMsg']}" 

1993 

1994 new_errstr = json.dumps( 

1995 new_errors, 

1996 cls=JSONEncoder, 

1997 indent=4, 

1998 ) 

1999 old_errstr = json.dumps(self.__cache.queueErrors, cls=JSONEncoder, indent=4) 

2000 

2001 if new_errstr != old_errstr: 

2002 if new_errors: 

2003 desc = "objects in error queue have changed" 

2004 else: 

2005 desc = "no more objects in error queue" 

2006 

2007 __hermes__.logger.info(desc) 

2008 Email.sendDiff( 

2009 config=self.__config, 

2010 contentdesc=desc, 

2011 previous=old_errstr, 

2012 current=new_errstr, 

2013 ) 

2014 self.__cache.queueErrors = new_errors 

2015 

2016 def __notifyDatamodelWarnings(self): 

2017 """Notify of any data model warnings changes""" 

2018 new_errors = self.__status(level="warning", ignoreUnhandledExceptions=True) 

2019 

2020 if "datamodel" not in new_errors: 

2021 new_errors = {} 

2022 else: 

2023 new_errors = new_errors["datamodel"] 

2024 

2025 new_errstr = json.dumps( 

2026 new_errors, 

2027 cls=JSONEncoder, 

2028 indent=4, 

2029 ) 

2030 old_errstr = json.dumps( 

2031 self.__cache.datamodelWarnings, cls=JSONEncoder, indent=4 

2032 ) 

2033 

2034 if new_errors: 

2035 __hermes__.logger.error("Datamodel has warnings:\n" + new_errstr) 

2036 

2037 if new_errstr != old_errstr: 

2038 if new_errors: 

2039 desc = "datamodel warnings have changed" 

2040 else: 

2041 desc = "no more datamodel warnings" 

2042 

2043 __hermes__.logger.info(desc) 

2044 Email.sendDiff( 

2045 config=self.__config, 

2046 contentdesc=desc, 

2047 previous=old_errstr, 

2048 current=new_errstr, 

2049 ) 

2050 self.__cache.datamodelWarnings = new_errors 

2051 

2052 def __notifyException(self, trace: str | None): 

2053 """Notify of any unhandled exception met/solved""" 

2054 if trace: 

2055 __hermes__.logger.critical(f"Unhandled exception: {trace}") 

2056 

2057 if self.__cache.exception != trace: 

2058 if trace: 

2059 desc = "unhandled exception" 

2060 else: 

2061 desc = "no more unhandled exception" 

2062 

2063 __hermes__.logger.info(desc) 

2064 previous = "" if self.__cache.exception is None else self.__cache.exception 

2065 current = "" if trace is None else trace 

2066 Email.sendDiff( 

2067 config=self.__config, 

2068 contentdesc=desc, 

2069 previous=previous, 

2070 current=current, 

2071 ) 

2072 self.__cache.exception = trace 

2073 

2074 def __notifyFatalException(self, trace: str): 

2075 """Notify of any fatal exception met before, and terminate app""" 

2076 self.__isStopped = True 

2077 

2078 desc = "Unhandled fatal exception, APP WILL TERMINATE IMMEDIATELY" 

2079 NL = "\n" 

2080 

2081 __hermes__.logger.critical(f"{desc}: {trace}") 

2082 

2083 Email.send( 

2084 config=self.__config, 

2085 subject=f"[{self.__config['appname']}] {desc}", 

2086 content=f"{desc}:{NL}{NL}{trace}", 

2087 )