Coverage for server/hermesserver.py: 69%

360 statements  

« prev     ^ index     » next       coverage.py v7.10.1, created at 2025-07-28 07:25 +0000

1#!/usr/bin/env python3 

2# -*- coding: utf-8 -*- 

3 

4# Hermes : Change Data Capture (CDC) tool from any source(s) to any target 

5# Copyright (C) 2023, 2024 INSA Strasbourg 

6# 

7# This file is part of Hermes. 

8# 

9# Hermes is free software: you can redistribute it and/or modify 

10# it under the terms of the GNU General Public License as published by 

11# the Free Software Foundation, either version 3 of the License, or 

12# (at your option) any later version. 

13# 

14# Hermes is distributed in the hope that it will be useful, 

15# but WITHOUT ANY WARRANTY; without even the implied warranty of 

16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

17# GNU General Public License for more details. 

18# 

19# You should have received a copy of the GNU General Public License 

20# along with Hermes. If not, see <https://www.gnu.org/licenses/>. 

21 

22 

23from lib.config import HermesConfig 

24from lib.version import HERMES_VERSION 

25from lib.datamodel.dataschema import Dataschema 

26from lib.datamodel.dataobject import DataObject 

27from lib.datamodel.datasource import Datasource 

28from lib.datamodel.diffobject import DiffObject 

29from lib.datamodel.event import Event 

30from lib.datamodel.serialization import LocalCache, JSONEncoder 

31from lib.plugins import AbstractMessageBusProducerPlugin, FailedToSendEventError 

32from lib.utils.mail import Email 

33from lib.utils.socket import ( 

34 SockServer, 

35 SocketMessageToServer, 

36 SocketMessageToClient, 

37 SocketArgumentParser, 

38 SocketParsingError, 

39 SocketParsingMessage, 

40) 

41from server.datamodel import Datamodel 

42 

43from datetime import datetime, timedelta 

44import argparse 

45import json 

46import time 

47import signal 

48import traceback 

49from types import FrameType 

50from typing import Any 

51 

52 

53class HermesServerCache(LocalCache): 

54 """Hermes server data to cache""" 

55 

56 def __init__(self, from_json_dict: dict[str, Any] = {}): 

57 super().__init__( 

58 jsondataattr=["lastUpdate", "errors", "exception"], 

59 cachefilename="_hermes-server", 

60 ) 

61 

62 self.lastUpdate: datetime | None = from_json_dict.get("lastUpdate") 

63 """Datetime of latest update""" 

64 

65 self.errors: dict[str, dict[str, dict[str, Any]]] = from_json_dict.get( 

66 "errors", {} 

67 ) 

68 """Dictionary containing current errors, for notifications""" 

69 

70 self.exception: str | None = from_json_dict.get("exception") 

71 """String containing latest exception trace""" 

72 

73 def savecachefile(self, cacheFilename: str | None = None): 

74 """Override method only to disable backup files in cache""" 

75 return super().savecachefile(cacheFilename, dontKeepBackup=True) 

76 

77 # Example of cache migration method 

78 # @classmethod 

79 # def migrate_from_v0_0_3_to_v0_1_0( 

80 # cls: "HermesServerCache", jsondict: Any | dict[Any, Any] 

81 # ) -> Any | dict[Any, Any]: 

82 # jsondict["lastUpdate"] = datetime.fromisoformat(jsondict["lastUpdate"]) 

83 # return jsondict 

84 

85 

86class HermesServer: 

87 """Hermes-server main class""" 

88 

89 def __init__(self, config: HermesConfig): 

90 """Set up a server instance. 

91 The mainloop() method MUST then be called to start the service""" 

92 

93 __hermes__.logger.info(f"Starting {config['appname']} v{HERMES_VERSION}") 

94 

95 # Setup the signals handler 

96 config.setSignalsHandler(self.signalHandler) 

97 

98 self.config: HermesConfig = config 

99 self._msgbus: AbstractMessageBusProducerPlugin = self.config["hermes"][ 

100 "plugins" 

101 ]["messagebus"]["plugininstance"] 

102 self.dm: Datamodel = Datamodel(self.config) 

103 

104 self._cache: HermesServerCache = HermesServerCache.loadcachefile( 

105 "_hermes-server" 

106 ) 

107 """Cached attributes""" 

108 

109 self._initSyncRequested: bool = False 

110 """Indicate that an initsync sequence has been requested""" 

111 self._isStopped: bool = False 

112 """mainloop() will run until this var is set to True""" 

113 self._isPaused: datetime | None = None 

114 """Contains pause datetime if standard processing is paused, None otherwise""" 

115 self._forceUpdate = False 

116 """Indicate that a (forced) update command has been requested""" 

117 self._updateInterval: timedelta = timedelta( 

118 seconds=config["hermes-server"]["updateInterval"] 

119 ) 

120 """Interval between two update""" 

121 self._numberOfLoopToProcess: int | None = None 

122 """**For functionnal tests only**, if a value is set, will process for *value* 

123 iterations of mainloop and pause execution until a new positive value is set""" 

124 

125 self._firstFetchDone = False 

126 """Indicate if a full data set has been fetched since start""" 

127 

128 now = datetime.now() 

129 self._nextUpdate: datetime = now 

130 """Datetime to wait before processing next update""" 

131 if ( 

132 self._cache.lastUpdate 

133 and now < self._cache.lastUpdate + self._updateInterval 

134 ): 

135 self._nextUpdate = self._cache.lastUpdate + self._updateInterval 

136 

137 self.startTime: datetime | None = None 

138 """Datetime when mainloop was started""" 

139 

140 self._sock: SockServer | None = None 

141 if ( 

142 config["hermes"]["cli_socket"]["path"] is not None 

143 or config["hermes"]["cli_socket"]["dont_manage_sockfile"] is not None 

144 ): 

145 self._sock = SockServer( 

146 path=config["hermes"]["cli_socket"]["path"], 

147 owner=config["hermes"]["cli_socket"]["owner"], 

148 group=config["hermes"]["cli_socket"]["group"], 

149 mode=config["hermes"]["cli_socket"]["mode"], 

150 processHdlr=self._processSocketMessage, 

151 dontManageSockfile=config["hermes"]["cli_socket"][ 

152 "dont_manage_sockfile" 

153 ], 

154 ) 

155 self.__setupSocketParser() 

156 

157 def __setupSocketParser(self): 

158 """Set up the argparse context for unix socket commands""" 

159 self._parser = SocketArgumentParser( 

160 prog=f"{self.config['appname']}-cli", 

161 description="Hermes Server CLI", 

162 exit_on_error=False, 

163 ) 

164 

165 subparsers = self._parser.add_subparsers(help="Sub-commands") 

166 

167 # Initsync 

168 sp_initsync = subparsers.add_parser( 

169 "initsync", 

170 help=( 

171 "Send specific init message containing all data but passwords." 

172 " Useful to fill new client" 

173 ), 

174 ) 

175 sp_initsync.set_defaults(func=self.sock_initsync) 

176 

177 # Update 

178 sp_update = subparsers.add_parser( 

179 "update", 

180 help="Force update now, ignoring updateInterval", 

181 ) 

182 sp_update.set_defaults(func=self.sock_update) 

183 

184 # Quit 

185 sp_quit = subparsers.add_parser("quit", help="Stop server") 

186 sp_quit.set_defaults(func=self.sock_quit) 

187 

188 # Pause 

189 sp_pause = subparsers.add_parser( 

190 "pause", help="Pause processing until 'resume' command is sent" 

191 ) 

192 sp_pause.set_defaults(func=self.sock_pause) 

193 

194 # Resume 

195 sp_resume = subparsers.add_parser( 

196 "resume", help="Resume processing that has been paused with 'pause'" 

197 ) 

198 sp_resume.set_defaults(func=self.sock_resume) 

199 

200 # Status 

201 sp_status = subparsers.add_parser("status", help="Show server status") 

202 sp_status.set_defaults(func=self.sock_status) 

203 sp_status.add_argument( 

204 "-j", 

205 "--json", 

206 action="store_const", 

207 const=True, 

208 default=False, 

209 help="Print status as json", 

210 ) 

211 sp_status.add_argument( 

212 "-v", 

213 "--verbose", 

214 action="store_const", 

215 const=True, 

216 default=False, 

217 help="Output items without values", 

218 ) 

219 

220 def signalHandler(self, signalnumber: int, frame: FrameType | None): 

221 """Signal handler that will be called on SIGINT and SIGTERM""" 

222 __hermes__.logger.critical( 

223 f"Signal '{signal.strsignal(signalnumber)}' received, terminating" 

224 ) 

225 self._isStopped = True 

226 

227 def _processSocketMessage( 

228 self, msg: SocketMessageToServer 

229 ) -> SocketMessageToClient: 

230 """Handler that process specified msg received on unix socket and returns the 

231 answer to send""" 

232 reply: SocketMessageToClient | None = None 

233 

234 try: 

235 args = self._parser.parse_args(msg.argv) 

236 if "func" not in args: 

237 raise SocketParsingMessage(self._parser.format_help()) 

238 except (SocketParsingError, SocketParsingMessage) as e: 

239 retmsg = str(e) 

240 except argparse.ArgumentError as e: 

241 retmsg = self._parser.format_error(str(e)) 

242 else: 

243 try: 

244 reply = args.func(args) 

245 except Exception as e: 

246 lines = traceback.format_exception(type(e), e, e.__traceback__) 

247 trace = "".join(lines).strip() 

248 __hermes__.logger.critical(f"Unhandled exception: {trace}") 

249 retmsg = trace 

250 

251 if reply is None: # Error was met 

252 reply = SocketMessageToClient(retcode=1, retmsg=retmsg) 

253 

254 return reply 

255 

256 def sock_initsync(self, args: argparse.Namespace) -> SocketMessageToClient: 

257 """Handler called when a valid initsync subcommand is requested on unix 

258 socket""" 

259 self._initSyncRequested = True 

260 return SocketMessageToClient(retcode=0, retmsg="") 

261 

262 def sock_update(self, args: argparse.Namespace) -> SocketMessageToClient: 

263 """Handler called when a valid update subcommand is requested on unix socket""" 

264 self._forceUpdate = True 

265 return SocketMessageToClient(retcode=0, retmsg="") 

266 

267 def sock_quit(self, args: argparse.Namespace) -> SocketMessageToClient: 

268 """Handler called when quit subcommand is requested on unix socket""" 

269 self._isStopped = True 

270 __hermes__.logger.info("hermes-server has been requested to quit") 

271 return SocketMessageToClient(retcode=0, retmsg="") 

272 

273 def sock_pause(self, args: argparse.Namespace) -> SocketMessageToClient: 

274 """Handler called when pause subcommand is requested on unix socket""" 

275 if self._isStopped: 

276 return SocketMessageToClient( 

277 retcode=1, retmsg="Error: server is currently being stopped" 

278 ) 

279 

280 if self._isPaused: 

281 return SocketMessageToClient( 

282 retcode=1, retmsg="Error: server is already paused" 

283 ) 

284 

285 __hermes__.logger.info("hermes-server has been requested to pause") 

286 self._isPaused = datetime.now() 

287 return SocketMessageToClient(retcode=0, retmsg="") 

288 

289 def sock_resume(self, args: argparse.Namespace) -> SocketMessageToClient: 

290 """Handler called when resume subcommand is requested on unix socket""" 

291 if self._isStopped: 

292 return SocketMessageToClient( 

293 retcode=1, retmsg="Error: server is currently being stopped" 

294 ) 

295 

296 if not self._isPaused: 

297 return SocketMessageToClient( 

298 retcode=1, retmsg="Error: server is not paused" 

299 ) 

300 

301 __hermes__.logger.info("hermes-server has been requested to resume") 

302 self._isPaused = None 

303 return SocketMessageToClient(retcode=0, retmsg="") 

304 

305 def sock_status(self, args: argparse.Namespace) -> SocketMessageToClient: 

306 """Handler called when status subcommand is requested on unix socket""" 

307 status = self.status(verbose=args.verbose) 

308 if args.json: 

309 msg = json.dumps(status, indent=4) 

310 else: 

311 nl = "\n" 

312 info2printable = { 

313 "inconsistencies": "Inconsistencies", 

314 "mergeConflicts": "Merge conflicts", 

315 "integrityFiltered": "Filtered by integrity constraints", 

316 "mergeFiltered": "Filtered by merge constraints", 

317 } 

318 msg = "" 

319 for objname in ["hermes-server"] + list(status.keys() - ("hermes-server",)): 

320 infos = status[objname] 

321 msg += f"{objname}:{nl}" 

322 for category in ("information", "warning", "error"): 

323 if category not in infos: 

324 continue 

325 if not infos[category]: 

326 msg += f" * {category.capitalize()}: []{nl}" 

327 continue 

328 

329 msg += f" * {category.capitalize()}{nl}" 

330 for infoname, infodata in infos[category].items(): 

331 indentedinfodata = str(infodata).replace("\n", "\n ") 

332 msg += ( 

333 f" - {info2printable.get(infoname, infoname)}:" 

334 f" {indentedinfodata}{nl}" 

335 ) 

336 msg = msg.rstrip() 

337 

338 return SocketMessageToClient(retcode=0, retmsg=msg) 

339 

340 def _checkForSchemaChanges(self): 

341 curschema: Dataschema = self.dm.dataschema 

342 oldschema: Dataschema = Dataschema.loadcachefile("_dataschema") 

343 diff = curschema.diffFrom(oldschema) 

344 

345 if diff: 

346 old: dict[str, Any] = oldschema.schema 

347 new: dict[str, Any] = curschema.schema 

348 if old: 

349 __hermes__.logger.info("Dataschema has changed since last run") 

350 else: 

351 __hermes__.logger.info("Loading first dataschema") 

352 

353 if diff.added: 

354 __hermes__.logger.info(f"Types added in Dataschema: {diff.added}") 

355 

356 if diff.removed: 

357 __hermes__.logger.info( 

358 f"Types removed from Dataschema: {diff.removed}," 

359 " generate events to mark data as deleted" 

360 ) 

361 

362 # Create a datasource with same content as cache, minus the types to 

363 # remove 

364 olddata: Datasource = Datasource( 

365 schema=oldschema, enableTrashbin=False, enableCache=False 

366 ) 

367 olddata.loadFromCache() 

368 

369 # Create an empty datasource and copy the data types to keep into it 

370 newdata: Datasource = Datasource( 

371 schema=oldschema, enableTrashbin=False, enableCache=False 

372 ) 

373 for objtype in oldschema.schema.keys(): 

374 if objtype not in diff.removed: 

375 newdata[objtype] = olddata[objtype] 

376 

377 # Send remove event of each entry of each removed type 

378 self.generateAndSendEvents( 

379 eventCategory="base", 

380 data=newdata, 

381 cache=olddata, 

382 save=True, 

383 commit=False, 

384 sendEvents=True, 

385 ) 

386 

387 __hermes__.logger.info( 

388 f"Types removed from Dataschema: {diff.removed}," 

389 " purging cache files" 

390 ) 

391 for objtype in diff.removed: 

392 LocalCache.deleteAllCacheFiles(objtype) 

393 

394 if diff.modified: 

395 for objtype in diff.modified: 

396 n = new[objtype] 

397 o = old[objtype] 

398 # HERMES_ATTRIBUTES 

399 added = n["HERMES_ATTRIBUTES"] - o["HERMES_ATTRIBUTES"] 

400 removed = o["HERMES_ATTRIBUTES"] - n["HERMES_ATTRIBUTES"] 

401 if added: 

402 __hermes__.logger.info( 

403 f"New attributes in dataschema type '{objtype}': {added}" 

404 ) 

405 if removed: 

406 __hermes__.logger.info( 

407 f"Removed attributes from dataschema type '{objtype}':" 

408 f" {removed}" 

409 ) 

410 

411 # SECRETS_ATTRIBUTES 

412 added = n["SECRETS_ATTRIBUTES"] - o["SECRETS_ATTRIBUTES"] 

413 removed = o["SECRETS_ATTRIBUTES"] - n["SECRETS_ATTRIBUTES"] 

414 if added: 

415 __hermes__.logger.info( 

416 f"New secrets attributes in dataschema type '{objtype}':" 

417 f" {added}" 

418 ) 

419 # We need to purge attribute from cache: as cache is loaded with 

420 # attribute set up as SECRET, we just have to save the cache 

421 # (attr won't be saved anymore, as it's SECRET) and reload 

422 # cache to "forget" values loaded from previous cache 

423 self.dm.data.cache.save() 

424 self.dm.data.cache.loadFromCache() 

425 if removed: 

426 __hermes__.logger.info( 

427 "Removed secrets attributes from dataschema type" 

428 f" '{objtype}': {removed}" 

429 ) 

430 

431 if old: 

432 e = Event( 

433 evcategory="base", 

434 eventtype="dataschema", 

435 objattrs=new, 

436 ) 

437 __hermes__.logger.info( 

438 f"Sending new schema on message bus {e.toString(set())}" 

439 ) 

440 with self._msgbus: 

441 self._msgbus.send(event=e) 

442 

443 self.dm.dataschema.savecachefile() 

444 

445 def mainLoop(self): 

446 """Server main loop""" 

447 self.startTime = datetime.now() 

448 

449 if self._sock is not None: 

450 self._sock.startProcessMessagesDaemon(appname=__hermes__.appname) 

451 

452 # Process schema changes if any, until it succeed 

453 checkForSchemaChangesDone = False 

454 while not self._isStopped and not checkForSchemaChangesDone: 

455 try: 

456 self._checkForSchemaChanges() 

457 except Exception as e: 

458 lines = traceback.format_exception(type(e), e, e.__traceback__) 

459 trace = "".join(lines).strip() 

460 self.notifyException(trace) 

461 self._cache.savecachefile() 

462 else: 

463 checkForSchemaChangesDone = True 

464 

465 # Reduce sleep duration during functional tests to speed them up 

466 sleepDuration = 1 if self._numberOfLoopToProcess is None else 0.05 

467 

468 while not self._isStopped: 

469 try: 

470 if self._initSyncRequested: 

471 self.initsync() 

472 self._initSyncRequested = False 

473 

474 if self._numberOfLoopToProcess is None: 

475 # Normal operations 

476 updateRequired = self._forceUpdate or ( 

477 not self._isPaused and datetime.now() >= self._nextUpdate 

478 ) 

479 else: 

480 # Special case for functional tests 

481 updateRequired = self._numberOfLoopToProcess > 0 

482 

483 if not updateRequired: 

484 time.sleep(sleepDuration) 

485 if self._nextUpdate + self._updateInterval < datetime.now(): 

486 # Keep updating _nextUpdate even when paused, ensuring that its 

487 # value remains in the past. This will avoid an uninterrupted 

488 # update sequence to make up for the pause time 

489 self._nextUpdate += self._updateInterval 

490 continue 

491 

492 # Standard run 

493 if self._forceUpdate: 

494 self._forceUpdate = False 

495 else: 

496 self._nextUpdate += self._updateInterval 

497 

498 self.dm.fetch() 

499 self.generateAndSendEvents( 

500 eventCategory="base", 

501 data=self.dm.data, 

502 cache=self.dm.data.cache, 

503 save=True, 

504 commit=True, 

505 sendEvents=(self._cache.lastUpdate is not None), 

506 ) 

507 self._cache.lastUpdate = datetime.now() 

508 self.notifyException(None) 

509 self._cache.savecachefile() 

510 

511 except Exception as e: 

512 lines = traceback.format_exception(type(e), e, e.__traceback__) 

513 trace = "".join(lines).strip() 

514 self.notifyException(trace) 

515 self._cache.savecachefile() 

516 

517 __hermes__.logger.warning( 

518 "An error was met. Waiting 60 seconds before retrying" 

519 ) 

520 # Wait one second 60 times to avoid waiting too long before stopping 

521 for i in range(60): 

522 if self._isStopped: 

523 break 

524 time.sleep(1) 

525 

526 # Only used in functionnal tests 

527 if self._numberOfLoopToProcess: 

528 self._numberOfLoopToProcess -= 1 

529 

530 self._cache.savecachefile() 

531 

532 def status( 

533 self, verbose=False, level="information", ignoreUnhandledExceptions=False 

534 ) -> dict[str, dict[str, dict[str, Any]]]: 

535 """Returns a dict containing status for hermes-server and each defined type in 

536 datamodel. 

537 

538 Each status contains 3 categories/levels: ""information", "warning" and "error" 

539 """ 

540 if level not in ("information", "warning", "error"): 

541 raise AttributeError( 

542 f"Specified level '{level}' is invalid." 

543 """ Possible values are ("information", "warning", "error"):""" 

544 ) 

545 

546 if level == "error": 

547 levels = ["error"] 

548 elif level == "warning": 

549 levels = [ 

550 "warning", 

551 "error", 

552 ] 

553 elif level == "information": 

554 levels = [ 

555 "information", 

556 "warning", 

557 "error", 

558 ] 

559 

560 res = { 

561 "hermes-server": { 

562 "information": { 

563 "startTime": self.startTime.strftime("%Y-%m-%d %H:%M:%S"), 

564 "status": "paused" if self._isPaused else "running", 

565 "pausedSince": ( 

566 self._isPaused.strftime("%Y-%m-%d %H:%M:%S") 

567 if self._isPaused 

568 else "None" 

569 ), 

570 "lastUpdate": ( 

571 self._cache.lastUpdate.strftime("%Y-%m-%d %H:%M:%S") 

572 if self._cache.lastUpdate 

573 else "None" 

574 ), 

575 "nextUpdate": self._nextUpdate.strftime("%Y-%m-%d %H:%M:%S"), 

576 }, 

577 "warning": {}, 

578 "error": {}, 

579 }, 

580 } 

581 if not ignoreUnhandledExceptions and self._cache.exception: 

582 res["hermes-server"]["error"]["unhandledException"] = self._cache.exception 

583 

584 for objname, objlist in self.dm.data.items(): 

585 res[objname] = { 

586 "information": {}, 

587 "warning": {}, 

588 "error": {}, 

589 } 

590 if not self._firstFetchDone and objname in self._cache.errors: 

591 res[objname] |= self._cache.errors[objname] 

592 

593 for level, src in [ 

594 ("error", "inconsistencies"), 

595 ("error", "mergeConflicts"), 

596 ("warning", "integrityFiltered"), 

597 ("warning", "mergeFiltered"), 

598 ]: 

599 if getattr(objlist, src): 

600 res[objname][level][src] = [] 

601 for pkey in sorted(getattr(objlist, src)): 

602 obj = objlist.get(pkey) 

603 objrepr = pkey if obj is None else repr(obj) 

604 res[objname][level][src].append(objrepr) 

605 

606 for objname in self.dm.data.keys() | ("hermes-server",): 

607 for category in ("information", "warning", "error"): 

608 if category not in levels or ( 

609 not verbose and not res[objname][category] 

610 ): 

611 del res[objname][category] 

612 

613 if not verbose and not res[objname]: 

614 del res[objname] 

615 

616 return res 

617 

618 def initsync(self): 

619 """Send an initsync sequence""" 

620 empty: Datasource = Datasource( 

621 schema=self.dm.dataschema, enableTrashbin=False, enableCache=False 

622 ) 

623 self.generateAndSendEvents( 

624 eventCategory="initsync", 

625 data=self.dm.data.cache, 

626 cache=empty, 

627 save=False, 

628 commit=False, 

629 sendEvents=True, 

630 ) 

631 

632 def generateAndSendEvents( 

633 self, 

634 eventCategory: str, 

635 data: Datasource, 

636 cache: Datasource, 

637 save: bool, 

638 commit: bool, 

639 sendEvents: bool, 

640 ): 

641 """Generate and send events of specified eventCategory ("base" or "initsync"), 

642 computed upon differences between specified data and cache. 

643 If save is True, cache will be updated and saved on disk. 

644 If sendEvents is True, events will be sent on msgbus. 

645 If commit and sendEvents are True, the datamodel commit_one and commit_all 

646 methods will be called""" 

647 

648 if eventCategory not in ("base", "initsync"): 

649 err = f"Specified eventType '{eventCategory}' is invalid" 

650 __hermes__.logger.critical(err) 

651 raise ValueError(err) 

652 

653 with self._msgbus: 

654 if eventCategory == "initsync" and sendEvents: 

655 self._msgbus.send( 

656 Event( 

657 evcategory=eventCategory, 

658 eventtype="init-start", 

659 obj=None, 

660 objattrs=self.dm.dataschema.schema, # Send current schema 

661 ) 

662 ) 

663 

664 # Loop over each datamodel type to compute diffs 

665 diffs: dict[str, DiffObject] = {} 

666 for objtype in data.keys(): 

667 # Generate diff between fresh data and cache 

668 diff = data[objtype].diffFrom(cache[objtype]) 

669 diffs[objtype] = diff 

670 if diff: 

671 __hermes__.logger.info( 

672 f"{objtype} have changed: {len(diff.added)} added," 

673 f" {len(diff.modified)} modified," 

674 f" {len(diff.removed)} removed" 

675 ) 

676 

677 # Process events 

678 for changeType in ["added", "modified", "removed"]: 

679 if changeType == "removed": 

680 # Process removed events in the datamodel reversed declaration order 

681 objTypes = reversed(data.keys()) 

682 else: 

683 # Process other events in the datamodel declaration order 

684 objTypes = data.keys() 

685 

686 for objtype in objTypes: 

687 secretAttrs = data.schema.secretsAttributesOf(objtype) 

688 diff = diffs[objtype] 

689 difflist = diff.dict[changeType] 

690 # Loop over each diff item of current changeType and create event 

691 diffitem: DiffObject | DataObject 

692 for diffitem in difflist: 

693 (event, obj) = Event.fromDiffItem( 

694 diffitem, eventCategory, changeType 

695 ) 

696 

697 if sendEvents: 

698 # Send event 

699 try: 

700 __hermes__.logger.info( 

701 f"Sending {event.toString(secretAttrs)}" 

702 ) 

703 self._msgbus.send(event=event) 

704 except FailedToSendEventError as e: 

705 # Event not sent 

706 if save: 

707 cache.save() 

708 __hermes__.logger.critical( 

709 f"Failed to send event. Execution aborted: {str(e)}" 

710 ) 

711 raise 

712 

713 # Event sent, validate its changes 

714 if eventCategory == "base": 

715 if sendEvents and commit: 

716 self.dm.commit_one(obj) 

717 

718 match event.eventtype: 

719 case "added": 

720 cache[objtype].append(obj) 

721 case "removed": 

722 cache[objtype].remove(obj) 

723 case "modified": 

724 cache[objtype].replace(obj) 

725 

726 if sendEvents and commit: 

727 self.dm.commit_all(objtype) 

728 

729 if save: 

730 cache.save() 

731 

732 if eventCategory == "initsync" and sendEvents: 

733 self._msgbus.send( 

734 Event( 

735 evcategory=eventCategory, 

736 eventtype="init-stop", 

737 obj=None, 

738 objattrs={}, 

739 ) 

740 ) 

741 

742 self._firstFetchDone = True 

743 self.notifyErrors() 

744 

745 def notifyErrors(self): 

746 """Notify of any data error met/solved""" 

747 new_errors = self.status(level="error", ignoreUnhandledExceptions=True) 

748 

749 new_errstr = json.dumps( 

750 new_errors, 

751 cls=JSONEncoder, 

752 indent=4, 

753 ) 

754 old_errstr = json.dumps(self._cache.errors, cls=JSONEncoder, indent=4) 

755 

756 nl = "\n" 

757 

758 if new_errors: 

759 __hermes__.logger.error(f"Data errors met: {nl}{new_errstr}") 

760 

761 if new_errstr != old_errstr: 

762 if new_errors: 

763 desc = "data errors met" 

764 else: 

765 desc = "no more data errors" 

766 

767 __hermes__.logger.info(desc) 

768 Email.sendDiff( 

769 config=self.config, 

770 contentdesc=desc, 

771 previous=old_errstr, 

772 current=new_errstr, 

773 ) 

774 self._cache.errors = new_errors 

775 

776 def notifyException(self, trace: str | None): 

777 """Notify of any unhandled exception met/solved""" 

778 if trace: 

779 __hermes__.logger.critical(f"Unhandled exception: {trace}") 

780 

781 if self._cache.exception != trace: 

782 if trace: 

783 desc = "unhandled exception" 

784 else: 

785 desc = "no more unhandled exception" 

786 

787 __hermes__.logger.info(desc) 

788 previous = "" if self._cache.exception is None else self._cache.exception 

789 current = "" if trace is None else trace 

790 Email.sendDiff( 

791 config=self.config, 

792 contentdesc=desc, 

793 previous=previous, 

794 current=current, 

795 ) 

796 self._cache.exception = trace