Coverage for clients / datamodel.py: 93%

413 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-03-16 15:11 +0000

1#!/usr/bin/env python3 

2# -*- coding: utf-8 -*- 

3 

4# Hermes : Change Data Capture (CDC) tool from any source(s) to any target 

5# Copyright (C) 2023 INSA Strasbourg 

6# 

7# This file is part of Hermes. 

8# 

9# Hermes is free software: you can redistribute it and/or modify 

10# it under the terms of the GNU General Public License as published by 

11# the Free Software Foundation, either version 3 of the License, or 

12# (at your option) any later version. 

13# 

14# Hermes is distributed in the hope that it will be useful, 

15# but WITHOUT ANY WARRANTY; without even the implied warranty of 

16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

17# GNU General Public License for more details. 

18# 

19# You should have received a copy of the GNU General Public License 

20# along with Hermes. If not, see <https://www.gnu.org/licenses/>. 

21 

22 

23from lib.config import HermesConfig 

24 

25from copy import deepcopy 

26from datetime import datetime 

27from jinja2 import StrictUndefined 

28from jinja2.environment import Template 

29from typing import Any 

30 

31from clients.errorqueue import ErrorQueue 

32from lib.datamodel.dataobject import DataObject 

33from lib.datamodel.dataobjectlist import DataObjectList 

34from lib.datamodel.dataschema import Dataschema 

35from lib.datamodel.datasource import Datasource 

36from lib.datamodel.diffobject import DiffObject 

37from lib.datamodel.event import Event 

38from lib.datamodel.serialization import LocalCache 

39from lib.datamodel.jinja import ( 

40 HermesNativeEnvironment, 

41 Jinja, 

42 HermesUnknownVarsInJinjaTemplateError, 

43) 

44 

45 

46class InvalidDataError(Exception): 

47 """Raised when a case that should never happen occurs (a critical bug)""" 

48 

49 

50class MissingForeignkeyDatatypeError(Exception): 

51 """Raised when datamodel is missing a linked server data type that provides the 

52 foreign keys required by a client data type""" 

53 

54 

55class Datamodel: 

56 """Load and build the Datamodel from config, and validates it according to remote 

57 Dataschema. 

58 

59 In charge of: 

60 - handling updates of Datamodel (hermes-client.datamodel changes in config file) 

61 - handling updates of remote Dataschema (hermes-server.datamodel in server 

62 config file) 

63 - converting a remote Event to a local one 

64 - handling remote and local data caches (remotedata and localdata attributes, 

65 each of Datasource type) 

66 """ 

67 

68 def __init__( 

69 self, 

70 config: HermesConfig, 

71 ): 

72 """Build the datamodel from config""" 

73 

74 self.unknownRemoteTypes: set[str] = set() 

75 """List remote types set in client Datamodel, but missing in remote 

76 Dataschema""" 

77 self.unknownRemoteAttributes: dict[str, set[str]] = set() 

78 """List remote attributes set in client Datamodel, but missing in remote 

79 Dataschema. The dict key contains the remote type, the set contains the missing 

80 attributes""" 

81 

82 self._config: HermesConfig = config 

83 

84 self._rawdatamodel: dict[str, Any] = self._config["hermes-client"]["datamodel"] 

85 """Local datamodel dictionary, as found in config""" 

86 

87 self._datamodel: dict[str, Any] 

88 """Local datamodel dictionary, with compiled Jinja templates""" 

89 

90 self._jinjaenv: HermesNativeEnvironment = HermesNativeEnvironment( 

91 undefined=StrictUndefined 

92 ) 

93 if "hermes" in self._config: 

94 self._jinjaenv.filters |= self._config["hermes"]["plugins"]["attributes"][ 

95 "_jinjafilters" 

96 ] 

97 

98 self.remote_schema: Dataschema = Dataschema.loadcachefile("_dataschema") 

99 """Remote schema""" 

100 self.local_schema: Dataschema | None = None 

101 """Local schema""" 

102 

103 self.remotedata: Datasource | None = None 

104 """Datasource of remote objects""" 

105 self.localdata: Datasource | None = None 

106 """Datasource of local objects""" 

107 

108 self.remotedata_complete: Datasource | None = None 

109 """Datasource of remote objects as it should be without error""" 

110 self.localdata_complete: Datasource | None = None 

111 """Datasource of local objects as it should be without error""" 

112 

113 self.errorqueue: ErrorQueue | None = None 

114 """Queue of Events in error""" 

115 

116 self.typesmapping: dict[str, list[str]] 

117 """Mapping of datamodel types: hermes-server type as key, hermes-client types as 

118 values""" 

119 self._remote2local: dict[str, dict[str, list[str]]] 

120 """Mapping with remote type name as key, and dict containing remote attrname as 

121 key and local attrname as value. Example: 

122 { 

123 remote_type_name: { 

124 remote_attrname1: client_attrname1, 

125 ... 

126 }, 

127 ... 

128 } 

129 """ 

130 

131 if self.hasRemoteSchema(): 

132 self._mergeWithSchema(self.remote_schema) 

133 

134 def hasRemoteSchema(self) -> bool: 

135 """Returns true if remote schema has data""" 

136 return len(self.remote_schema.schema) != 0 

137 

138 def diffFrom(self, other: "Datamodel") -> DiffObject: 

139 """Return DiffObject with differences (attributes names) of current instance 

140 from another""" 

141 diff = DiffObject() 

142 

143 s = self._rawdatamodel.keys() 

144 o = other._rawdatamodel.keys() 

145 commonattrs = s & o 

146 

147 diff.appendRemoved(o - s) 

148 diff.appendAdded(s - o) 

149 

150 for k, v in self._rawdatamodel.items(): 

151 if k in commonattrs and DataObject.isDifferent(v, other._rawdatamodel[k]): 

152 diff.appendModified(k) 

153 

154 return diff 

155 

156 def loadLocalData(self): 

157 """Load or reload localdata and localdata_complete from cache""" 

158 self.localdata = Datasource( 

159 schema=self.local_schema, enableTrashbin=True, cacheFilePrefix="__" 

160 ) 

161 self.localdata.loadFromCache() 

162 self.localdata_complete = Datasource( 

163 schema=self.local_schema, 

164 enableTrashbin=True, 

165 cacheFilePrefix="__", 

166 cacheFileSuffix="_complete__", 

167 ) 

168 self.localdata_complete.loadFromCache() 

169 

170 def saveLocalData(self): 

171 """Save localdata and localdata_complete when they're set""" 

172 if self.localdata is not None: 

173 self.localdata.save() 

174 if self.localdata_complete is not None: 

175 self.localdata_complete.save() 

176 

177 def loadRemoteData(self): 

178 """Load or reload remotedata and remotedata_complete from cache""" 

179 self.remotedata = Datasource(schema=self.remote_schema, enableTrashbin=True) 

180 self.remotedata.loadFromCache() 

181 self.remotedata_complete = Datasource( 

182 schema=self.remote_schema, 

183 enableTrashbin=True, 

184 cacheFileSuffix="_complete__", 

185 ) 

186 self.remotedata_complete.loadFromCache() 

187 if self.errorqueue is not None: 

188 self.errorqueue.updateDatasources( 

189 self.remotedata, 

190 self.remotedata_complete, 

191 self.localdata, 

192 self.localdata_complete, 

193 ) 

194 

195 def saveRemoteData(self): 

196 """Save remotedata and remotedata_complete when they're set""" 

197 if self.remotedata is not None: 

198 self.remotedata.save() 

199 if self.remotedata_complete is not None: 

200 self.remotedata_complete.save() 

201 

202 def loadLocalAndRemoteData(self): 

203 """Load or reload localdata, localdata_complete, remotedata and 

204 remotedata_complete from cache""" 

205 self.loadLocalData() 

206 self.loadRemoteData() 

207 

208 def saveLocalAndRemoteData(self): 

209 """Save localdata, localdata_complete, remotedata and remotedata_complete 

210 from cache when they're set""" 

211 self.saveLocalData() 

212 self.saveRemoteData() 

213 

214 def loadErrorQueue(self): 

215 """Load or reload error queue from cache""" 

216 if self.hasRemoteSchema(): 

217 self.errorqueue = ErrorQueue.loadcachefile( 

218 "_errorqueue", 

219 typesMapping=self.typesmapping, 

220 remotedata_complete=self.remotedata_complete, 

221 remotedata=self.remotedata, 

222 localdata=self.localdata, 

223 localdata_complete=self.localdata_complete, 

224 autoremediate=self._config["hermes-client"]["autoremediation"], 

225 ) 

226 else: 

227 self.errorqueue = None 

228 

229 def saveErrorQueue(self): 

230 """Save error queue to cache""" 

231 if self.errorqueue is not None: 

232 self.errorqueue.savecachefile() 

233 

234 def _mergeWithSchema(self, remote_schema: Dataschema): 

235 """Build or update the datamodel according to specified remote_schema""" 

236 prev_remote_schema = self.remote_schema 

237 self.remote_schema = remote_schema 

238 self._remote2local = {} 

239 

240 prev_remote_pkeys, new_remote_pkeys = self._checkForSchemaChanges( 

241 prev_remote_schema, self.remote_schema 

242 ) 

243 

244 self._fillDatamodelDict() # Filled upon config only 

245 self._fillConversionVars() # Filled upon config only 

246 

247 self.local_schema = self._setupLocalSchema() 

248 

249 # Update pkeys when necessary 

250 if new_remote_pkeys: 

251 __hermes__.logger.info("Updating local cache primary keys") 

252 

253 self.saveLocalAndRemoteData() 

254 self.saveErrorQueue() 

255 

256 new_local_pkeys: dict[str, str | tuple[str]] = {} 

257 l_pkeys_to_add: dict[str, set[str]] = {} 

258 l_pkeys_to_remove: dict[str, set[str]] = {} 

259 local_types: dict[str, set[type[DataObject]]] = {} 

260 for r_objtype in new_remote_pkeys.keys(): 

261 if r_objtype not in self.typesmapping: 

262 continue 

263 

264 # Determine local objtype and its new primary key 

265 for l_objtype in self.typesmapping[r_objtype]: 

266 new_local_pkeys[l_objtype] = self.local_schema.schema[l_objtype][ 

267 "PRIMARYKEY_ATTRIBUTE" 

268 ] 

269 

270 # Compute local pkeys to add and to remove for each local data type 

271 r_prev_pkeys = prev_remote_pkeys[r_objtype] 

272 r_new_pkeys = new_remote_pkeys[r_objtype] 

273 if type(r_prev_pkeys) is str: 

274 r_prev_pkeys = (r_prev_pkeys,) 

275 if type(r_new_pkeys) is str: 

276 r_new_pkeys = (r_new_pkeys,) 

277 l_prev_pkeys = set([f"_pkey_{pkey}" for pkey in r_prev_pkeys]) 

278 l_new_pkeys = set([f"_pkey_{pkey}" for pkey in r_new_pkeys]) 

279 l_pkeys_to_add[l_objtype] = l_new_pkeys - l_prev_pkeys 

280 l_pkeys_to_remove[l_objtype] = l_prev_pkeys - l_new_pkeys 

281 

282 local_types[l_objtype] = set() 

283 

284 # Add new pkey attributes and values to localdata objects 

285 for src in (self.localdata, self.localdata_complete): 

286 for type_prefix in ("", "trashbin_"): 

287 obj: DataObject 

288 for obj in src[f"{type_prefix}{l_objtype}"]: 

289 if type(obj) not in local_types[l_objtype]: 

290 local_types[l_objtype].add(type(obj)) 

291 type(obj).HERMES_ATTRIBUTES |= l_pkeys_to_add[ 

292 l_objtype 

293 ] 

294 # Get corresponding remote object from cache 

295 _, r_obj = Datamodel.getObjectFromCacheOrTrashbin( 

296 self.remotedata_complete, r_objtype, obj.getPKey() 

297 ) 

298 if r_obj is None: 

299 # Should never happen : if so, it's a bug 

300 msg = ( 

301 f"BUG ! No matching of local object {repr(obj)}" 

302 " found in remotedata_complete cache. The" 

303 " client is probably broken" 

304 ) 

305 __hermes__.logger.critical(msg) 

306 raise InvalidDataError(msg) 

307 

308 for pkey in r_new_pkeys: 

309 try: 

310 # Get pkey value from remote object 

311 value = getattr(r_obj, pkey) 

312 except AttributeError: 

313 # Should never happen : if so, it's a bug 

314 msg = ( 

315 "BUG ! No value exist in remote cache for" 

316 f" attribute '{pkey}' of object {r_obj}." 

317 " The client is probably broken" 

318 ) 

319 __hermes__.logger.critical(msg) 

320 raise InvalidDataError(msg) 

321 # Store pkey value to local object 

322 setattr(obj, f"_pkey_{pkey}", value) 

323 

324 # Update PRIMARYKEY_ATTRIBUTE of each local type 

325 for l_objtype, l_types in local_types.items(): 

326 for l_type in l_types: 

327 l_type.PRIMARYKEY_ATTRIBUTE = new_local_pkeys[l_objtype] 

328 

329 # Remove previous pkey attributes that are not used anymore from 

330 # localdata objects 

331 for src in (self.localdata, self.localdata_complete): 

332 for type_prefix in ("", "trashbin_"): 

333 obj: DataObject 

334 for obj in src[f"{type_prefix}{l_objtype}"]: 

335 for pkey in l_pkeys_to_remove[l_objtype]: 

336 try: 

337 delattr(obj, pkey) 

338 except AttributeError: 

339 pass 

340 

341 # Remove previous pkey attributes that are not used anymore from 

342 # HERMES_ATTRIBUTES of each local type 

343 for l_objtype, l_types in local_types.items(): 

344 for l_type in l_types: 

345 l_type.HERMES_ATTRIBUTES -= l_pkeys_to_remove[l_objtype] 

346 

347 self.saveLocalData() 

348 

349 __hermes__.logger.info("Updating changed primary keys in error queue") 

350 self.errorqueue.updatePrimaryKeys( 

351 new_remote_pkeys, 

352 self.remotedata, 

353 self.remotedata_complete, 

354 new_local_pkeys, 

355 self.localdata, 

356 self.localdata_complete, 

357 ) 

358 

359 # Save and reload error queue 

360 self.saveErrorQueue() 

361 self.loadErrorQueue() 

362 

363 # Load local and remote Datasource caches 

364 self.loadLocalAndRemoteData() 

365 

366 def updateSchema(self, remote_schema: Dataschema): 

367 """Build or update the datamodel according to specified remote_schema. 

368 Data caches (locadata, locadata_complete, remotedata and remotedata_complete) 

369 will be saved and reloaded to be updated according to new schema. 

370 Remote and local schemas caches will be saved. 

371 """ 

372 # Save current data before updating schema and reloading them 

373 self.saveLocalAndRemoteData() 

374 self._mergeWithSchema(remote_schema) 

375 self.remote_schema.savecachefile() 

376 

377 def forcePurgeOfTrashedObjectsWithoutNewPkeys( 

378 self, oldschema: Dataschema | None, newschema: Dataschema 

379 ) -> dict[str, set[Any]]: 

380 """On schema update, when primary key have changed, the trashed objects may not 

381 contain the value of the new primary key attribute(s). This function will 

382 change the trashbin timestamp of all those objects to force their removal. 

383 Returns True if a trashbin purge is required, False otherwise 

384 """ 

385 isTrashbinPurgeRequired: bool = False 

386 if oldschema is None: 

387 return False 

388 

389 diff = newschema.diffFrom(oldschema) 

390 

391 if not (diff and diff.modified): 

392 return False 

393 

394 old: dict[str, Any] = oldschema.schema 

395 new: dict[str, Any] = newschema.schema 

396 for objtype in diff.modified: 

397 if objtype not in self.typesmapping: 

398 continue 

399 npkey = new[objtype]["PRIMARYKEY_ATTRIBUTE"] 

400 opkey = old[objtype]["PRIMARYKEY_ATTRIBUTE"] 

401 if not DataObject.isDifferent(npkey, opkey): 

402 continue 

403 npkeys = (npkey,) if type(npkey) is str else npkey 

404 obj: DataObject 

405 for obj in self.remotedata[f"trashbin_{objtype}"]: 

406 for pkey in npkeys: 

407 if not hasattr(obj, pkey): 

408 __hermes__.logger.warning( 

409 f"Object {repr(obj)} of type '{objtype}' in trashbin will" 

410 " be purged, as it doesn't have the new primary key value" 

411 ) 

412 isTrashbinPurgeRequired = True 

413 obj._trashbin_timestamp = datetime(year=1, month=1, day=1) 

414 break 

415 return isTrashbinPurgeRequired 

416 

417 def _checkForSchemaChanges( 

418 self, oldschema: Dataschema | None, newschema: Dataschema 

419 ) -> tuple[dict[str, str | tuple[str]], dict[str, str | tuple[str]]]: 

420 """Returns a tuple of two dicts : 

421 - first dict with remote types as key, and previous remote primary key 

422 attribute as value 

423 - second dict with remote types as key, and new remote primary key attribute as 

424 value 

425 """ 

426 previouspkeys = {} 

427 newpkeys = {} 

428 if oldschema is None: 

429 return (previouspkeys, newpkeys) 

430 

431 diff = newschema.diffFrom(oldschema) 

432 

433 if diff: 

434 old: dict[str, Any] = oldschema.schema 

435 new: dict[str, Any] = newschema.schema 

436 

437 if diff.added: 

438 __hermes__.logger.info(f"Types added in Dataschema: {diff.added}") 

439 

440 if diff.removed: 

441 __hermes__.logger.info( 

442 f"Types removed from Dataschema: {diff.removed}," 

443 " purging cache files" 

444 ) 

445 self.purgeOldCacheFiles(diff.removed) 

446 

447 if diff.modified: 

448 for objtype in diff.modified: 

449 n = new[objtype] 

450 o = old[objtype] 

451 # HERMES_ATTRIBUTES 

452 added = n["HERMES_ATTRIBUTES"] - o["HERMES_ATTRIBUTES"] 

453 removed = o["HERMES_ATTRIBUTES"] - n["HERMES_ATTRIBUTES"] 

454 if added: 

455 __hermes__.logger.info( 

456 f"New attributes in dataschema type '{objtype}': {added}" 

457 ) 

458 if removed: 

459 __hermes__.logger.info( 

460 f"Removed attributes from dataschema type '{objtype}':" 

461 f" {removed}" 

462 ) 

463 

464 # SECRETS_ATTRIBUTES 

465 added = n["SECRETS_ATTRIBUTES"] - o["SECRETS_ATTRIBUTES"] 

466 removed = o["SECRETS_ATTRIBUTES"] - n["SECRETS_ATTRIBUTES"] 

467 if added: 

468 __hermes__.logger.info( 

469 f"New secrets attributes in dataschema type '{objtype}':" 

470 f" {added}" 

471 ) 

472 # We need to purge attribute from cache: as cache is loaded with 

473 # attribute set up as SECRET, we just have to save the cache 

474 # (attr won't be saved anymore, as it's SECRET) and reload 

475 # cache to "forget" values loaded from previous cache 

476 self.saveRemoteData() 

477 self.loadRemoteData() 

478 if removed: 

479 __hermes__.logger.info( 

480 "Removed secrets attributes from dataschema type" 

481 f" '{objtype}': {removed}" 

482 ) 

483 

484 # PRIMARYKEY_ATTRIBUTE 

485 npkey = n["PRIMARYKEY_ATTRIBUTE"] 

486 opkey = o["PRIMARYKEY_ATTRIBUTE"] 

487 if DataObject.isDifferent(npkey, opkey): 

488 previouspkeys[objtype] = opkey 

489 newpkeys[objtype] = npkey 

490 __hermes__.logger.info( 

491 "New primary key attribute in dataschema type" 

492 f" '{objtype}': {npkey}" 

493 ) 

494 

495 return (previouspkeys, newpkeys) 

496 

497 def convertEventToLocal( 

498 self, 

499 event: Event, 

500 new_obj: DataObject | None = None, 

501 allowEmptyEvent: bool = False, 

502 onlyForLocalType: str | None = None, 

503 ) -> list[Event | None]: 

504 """Convert specified remote event to local ones. 

505 If new_obj is provided, it must contains all the new remote object values, 

506 and will only be used to render Jinja Templates. 

507 Returns a None value if local event doesn't contains any attribute and 

508 allowEmptyEvent is False""" 

509 if event.objtype not in self.typesmapping: 

510 __hermes__.logger.debug( 

511 f"Unknown {event.objtype=}. Known are {self.typesmapping}" 

512 ) 

513 return [None] # Unknown type 

514 

515 res: list[Event | None] = [] 

516 

517 for objtype in self.typesmapping[event.objtype]: 

518 if onlyForLocalType is not None and objtype != onlyForLocalType: 

519 continue 

520 

521 # Handle that event.objattrs is 1 depth deeper for "modified" events 

522 if event.eventtype == "modified": 

523 sources = ("added", "modified", "removed") 

524 objattrs = {"added": {}, "modified": {}, "removed": {}} 

525 else: 

526 sources = (None,) 

527 objattrs = {} 

528 

529 hasContent: bool = False 

530 for source in sources: 

531 if source is None: 

532 src = event.objattrs 

533 else: 

534 src = event.objattrs[source] 

535 

536 # Hack to handle Jinja templates containing only static data 

537 if ( 

538 None in self._remote2local[event.objtype] 

539 and event.eventtype == "added" 

540 ): 

541 loopsrc = src.copy() 

542 loopsrc[None] = None 

543 else: 

544 loopsrc = src 

545 

546 for k, v in loopsrc.items(): 

547 if k in self._remote2local[event.objtype]: 

548 for dest in self._remote2local[event.objtype][k]: 

549 remoteattr = self._datamodel[objtype]["attrsmapping"][dest] 

550 if isinstance( 

551 remoteattr, Template 

552 ): # May be a compiled Jinja Template 

553 if new_obj is None: 

554 val = remoteattr.render(src) 

555 else: 

556 # We must provide all new object vars values to 

557 # render a Jinja Template computed from several 

558 # vars, in case of "modified" event changing the 

559 # value of only one var value used by the template. 

560 # The event objattrs won't be enough in this 

561 # specific case. 

562 val = remoteattr.render(new_obj.toNative()) 

563 

564 if type(val) is list: 

565 val = [v for v in val if v is not None] 

566 

567 if val is None or val == []: 

568 # No value 

569 if event.eventtype == "modified": 

570 objattrs["removed"].update({dest: val}) 

571 elif event.eventtype == "removed": 

572 objattrs.update({dest: val}) 

573 else: 

574 # In modified events, we have to determine if the 

575 # attribute is added or modified 

576 if event.eventtype == "modified": 

577 _, cachedObj = ( 

578 self.getObjectFromCacheOrTrashbin( 

579 self.localdata_complete, 

580 objtype, 

581 event.objpkey, 

582 ) 

583 ) 

584 

585 if cachedObj is not None and hasattr( 

586 cachedObj, dest 

587 ): 

588 # Ensure the value has changed 

589 previousVal = getattr(cachedObj, dest) 

590 if DataObject.isDifferent(previousVal, val): 

591 objattrs["modified"].update({dest: val}) 

592 hasContent = True 

593 else: 

594 # Attr is added 

595 objattrs["added"].update({dest: val}) 

596 hasContent = True 

597 else: 

598 objattrs.update({dest: val}) 

599 hasContent = True 

600 else: 

601 if source is None: 

602 objattrs.update({dest: v}) 

603 else: 

604 objattrs[source].update({dest: v}) 

605 hasContent = True 

606 

607 resev = None 

608 if hasContent or allowEmptyEvent or event.eventtype == "removed": 

609 resev = Event( 

610 evcategory=event.evcategory, 

611 eventtype=event.eventtype, 

612 objattrs=objattrs, 

613 ) 

614 resev.objtype = objtype 

615 resev.objpkey = event.objpkey 

616 resev.objrepr = str(resev.objpkey) 

617 resev.timestamp = event.timestamp 

618 resev.step = event.step 

619 resev.isPartiallyProcessed = event.isPartiallyProcessed 

620 res.append(resev) 

621 return res 

622 

623 def createLocalDataobject( 

624 self, objtype: str, objattrs: dict[str:Any] 

625 ) -> DataObject: 

626 """Returns instance of specified local Dataobject type from specified 

627 attributes""" 

628 return self.createDataobject(self.local_schema, objtype, objattrs) 

629 

630 def createRemoteDataobject( 

631 self, objtype: str, objattrs: dict[str:Any] 

632 ) -> DataObject: 

633 """Returns instance of specified remote Dataobject type from specified 

634 attributes""" 

635 return self.createDataobject(self.remote_schema, objtype, objattrs) 

636 

637 @staticmethod 

638 def createDataobject( 

639 schema: Dataschema, objtype: str, objattrs: dict[str:Any] 

640 ) -> DataObject: 

641 """Returns instance of specified Dataobject type from specified attributes""" 

642 return schema.objectTypes[objtype](from_json_dict=objattrs) 

643 

644 @staticmethod 

645 def getUpdatedObject(obj: DataObject, objattrs: dict[str, Any]) -> DataObject: 

646 """Return a deepcopy of specified obj, with its attributes updated upon 

647 specified objattrs dict from Event""" 

648 newobj = deepcopy(obj) 

649 

650 # Update newobj attributes 

651 for attrname, value in objattrs["added"].items(): 

652 setattr(newobj, attrname, value) # Add new attributes 

653 for attrname, value in objattrs["modified"].items(): 

654 setattr(newobj, attrname, value) # Update existing attributes 

655 for attrname, value in objattrs["removed"].items(): 

656 if hasattr(newobj, attrname): 

657 delattr(newobj, attrname) # Delete existing attributes 

658 

659 return newobj 

660 

661 def convertDataObjectToLocal( 

662 self, obj: DataObject, expectedLocalObjtype: str 

663 ) -> DataObject: 

664 """Convert specified Dataobject (remote) to expected local one""" 

665 tmpEvent = self.convertEventToLocal( 

666 Event("conversion", "added", obj, obj.toNative()), 

667 onlyForLocalType=expectedLocalObjtype, 

668 )[0] 

669 return self.local_schema.objectTypes[expectedLocalObjtype]( 

670 from_json_dict=tmpEvent.objattrs 

671 ) 

672 

673 def convertDataObjectListToLocal( 

674 self, remoteobjtype: str, objlist: DataObjectList, expectedLocalObjtype: str 

675 ) -> DataObjectList: 

676 """Convert specified DataObjectList (remote) to expected local one""" 

677 localobjs = [ 

678 self.convertDataObjectToLocal(obj, expectedLocalObjtype) for obj in objlist 

679 ] 

680 return self.local_schema.objectlistTypes[expectedLocalObjtype]( 

681 objlist=localobjs 

682 ) 

683 

684 @staticmethod 

685 def purgeOldCacheFiles( 

686 objtypes: list[str] | set[str], 

687 cacheFilePrefix: str = "", 

688 cacheFileSuffix: str = "", 

689 ): 

690 """ "Delete all cache files of specified objtypes""" 

691 for objtype in objtypes: 

692 LocalCache.deleteAllCacheFiles( 

693 f"{cacheFilePrefix}{objtype}{cacheFileSuffix}" 

694 ) 

695 LocalCache.deleteAllCacheFiles( 

696 f"{cacheFilePrefix}{objtype}_complete__{cacheFileSuffix}" 

697 ) 

698 LocalCache.deleteAllCacheFiles( 

699 f"{cacheFilePrefix}trashbin_{objtype}{cacheFileSuffix}" 

700 ) 

701 LocalCache.deleteAllCacheFiles( 

702 f"{cacheFilePrefix}trashbin_{objtype}_complete__{cacheFileSuffix}" 

703 ) 

704 

705 def _fillDatamodelDict(self): 

706 # Fill the datamodel dict 

707 self._datamodel = {} 

708 for objtype in self._config["hermes-client"]["datamodel"]: 

709 self._datamodel[objtype] = {} 

710 for k, v in self._config["hermes-client"]["datamodel"][objtype].items(): 

711 if k == "attrsmapping": 

712 self._datamodel[objtype][k] = dict(v) 

713 elif k == "toString" and v is not None: 

714 # Compile toString's jinja template 

715 jinjavars = set() 

716 self._datamodel[objtype][k] = Jinja.compileIfJinjaTemplate( 

717 v, 

718 jinjavars, 

719 self._jinjaenv, 

720 f"hermes-client.datamodel.{objtype}.toString", 

721 False, 

722 False, 

723 ) 

724 # Ensure jinja vars are known local attrs 

725 unknownattrs = ( 

726 jinjavars 

727 - self._config["hermes-client"]["datamodel"][objtype][ 

728 "attrsmapping" 

729 ].keys() 

730 ) 

731 if unknownattrs: 

732 raise HermesUnknownVarsInJinjaTemplateError( 

733 "Unknown attributes met in 'hermes-client.datamodel" 

734 f".{objtype}.toString' jinja template: {unknownattrs}" 

735 ) 

736 else: 

737 self._datamodel[objtype][k] = v 

738 

739 def _fillConversionVars(self): 

740 # Fill the types mapping (remote as key, list of local as value) 

741 typesmapping: dict[str, list[str]] = {} 

742 for k, v in self._datamodel.items(): 

743 if v["hermesType"] not in typesmapping: 

744 typesmapping[v["hermesType"]] = [] 

745 typesmapping[v["hermesType"]].append(k) 

746 

747 # Types consistency check 

748 self.unknownRemoteTypes = typesmapping.keys() - self.remote_schema.schema.keys() 

749 if self.unknownRemoteTypes: 

750 for t in self.unknownRemoteTypes: 

751 del typesmapping[t] 

752 

753 # Reorder typemapping to respect the order specified on remote schema 

754 self.typesmapping = {} 

755 for rtype in self.remote_schema.schema: 

756 if rtype in typesmapping: 

757 self.typesmapping[rtype] = typesmapping[rtype] 

758 

759 # Fill the remote2local mapping dict 

760 for objsettings in self._datamodel.values(): 

761 remote_objtype = objsettings["hermesType"] 

762 self._remote2local[remote_objtype] = {} 

763 # Add primary keys to mapping to ensure they're always available 

764 if remote_objtype in self.remote_schema.schema: 

765 pkeys = self.remote_schema.schema[remote_objtype][ 

766 "PRIMARYKEY_ATTRIBUTE" 

767 ] 

768 if type(pkeys) not in [list, tuple]: 

769 pkeys = [pkeys] 

770 for pkey in pkeys: 

771 objsettings["attrsmapping"][f"_pkey_{pkey}"] = pkey 

772 

773 for local_attr, remote_attr in objsettings["attrsmapping"].items(): 

774 remote_vars = set() 

775 objsettings["attrsmapping"][local_attr] = Jinja.compileIfJinjaTemplate( 

776 remote_attr, 

777 remote_vars, 

778 self._jinjaenv, 

779 f"hermes-client.datamodel.{remote_objtype}.attrsmapping", 

780 False, 

781 False, 

782 ) 

783 if len(remote_vars) == 0: 

784 # Hack to handle Jinja templates containing only static data 

785 remote_vars.add(None) 

786 

787 for remote_var in remote_vars: 

788 # As many local attrs can be mapped on a same remote attr, 

789 # store the mapping in a list 

790 self._remote2local[remote_objtype].setdefault( 

791 remote_var, [] 

792 ).append(local_attr) 

793 

794 # Attributes consistency check 

795 self.unknownRemoteAttributes = {} 

796 for rtype in self.typesmapping: 

797 diff = ( 

798 self._remote2local[rtype].keys() 

799 - self.remote_schema.schema[rtype]["HERMES_ATTRIBUTES"] 

800 - set([None]) # Ignore Jinja templates with static data only 

801 ) 

802 if diff: 

803 self.unknownRemoteAttributes[rtype] = diff 

804 

805 def _setupLocalSchema(self) -> Dataschema: 

806 """Build local schema from local datamodel and remote schema""" 

807 rschema: dict[str, Any] = self.remote_schema.schema 

808 schema: dict[str, Any] = {} 

809 for objtypes in self.typesmapping.values(): 

810 for objtype in objtypes: 

811 remote_objtype = self._datamodel[objtype]["hermesType"] 

812 

813 secrets = [] 

814 for attr in rschema[remote_objtype]["SECRETS_ATTRIBUTES"]: 

815 v = self._remote2local[remote_objtype].get(attr) 

816 if v is not None: 

817 secrets.extend(v) 

818 

819 # Add primary keys to mapping to ensure they're always there 

820 pkeys = self.remote_schema.schema[remote_objtype][ 

821 "PRIMARYKEY_ATTRIBUTE" 

822 ] 

823 if type(pkeys) in [list, tuple]: 

824 pkey = tuple([f"_pkey_{pkey}" for pkey in pkeys]) 

825 else: 

826 pkey = f"_pkey_{pkeys}" 

827 

828 # Convert foreign keys dict 

829 fkeys: dict[str, list[str]] = {} 

830 for from_attr, (to_obj, to_attr) in self.remote_schema.schema[ 

831 remote_objtype 

832 ]["FOREIGN_KEYS"].items(): 

833 try: 

834 # In current implementation, foreign key are always 

835 # single primary keys (not a tuple) 

836 fkeys[f"_pkey_{from_attr}"] = [ 

837 self.typesmapping[to_obj][0], 

838 f"_pkey_{to_attr}", 

839 ] 

840 except KeyError: 

841 msg = ( 

842 "Config error : client datamodel requires a data type" 

843 f" linked to server data type '{to_obj}', that will provide" 

844 " the foreign keys required by client data type" 

845 f" '{objtype}'" 

846 ) 

847 __hermes__.logger.critical(msg) 

848 raise MissingForeignkeyDatatypeError(msg) from None 

849 

850 schema[objtype] = { 

851 "HERMES_ATTRIBUTES": set( 

852 self._datamodel[objtype]["attrsmapping"].keys() 

853 ), 

854 "SECRETS_ATTRIBUTES": set(secrets), 

855 "CACHEONLY_ATTRIBUTES": set(), 

856 "LOCAL_ATTRIBUTES": set(), 

857 "PRIMARYKEY_ATTRIBUTE": pkey, 

858 "FOREIGN_KEYS": fkeys, 

859 "TOSTRING": self._datamodel[objtype]["toString"], 

860 } 

861 

862 res = Dataschema(schema) 

863 return res 

864 

865 @staticmethod 

866 def getObjectFromCacheOrTrashbin( 

867 ds: Datasource, objtype: str, objpkey: Any 

868 ) -> tuple[DataObjectList | None, DataObject | None]: 

869 """Look for objpkey in maincache and trashbin of objtype in specified ds. 

870 If found, returns a tuple with a the DataObjectList where the object is, and the 

871 object itself. Otherwise, returns (None, None)""" 

872 src: DataObjectList 

873 obj: DataObject | None 

874 for src in (ds[objtype], ds[f"trashbin_{objtype}"]): 

875 obj = src.get(objpkey) 

876 if obj is not None: 

877 return (src, obj) 

878 return (None, None)