Coverage for clients/datamodel.py: 92%

398 statements  

« prev     ^ index     » next       coverage.py v7.10.1, created at 2025-07-28 07:24 +0000

1#!/usr/bin/env python3 

2# -*- coding: utf-8 -*- 

3 

4# Hermes : Change Data Capture (CDC) tool from any source(s) to any target 

5# Copyright (C) 2023, 2024 INSA Strasbourg 

6# 

7# This file is part of Hermes. 

8# 

9# Hermes is free software: you can redistribute it and/or modify 

10# it under the terms of the GNU General Public License as published by 

11# the Free Software Foundation, either version 3 of the License, or 

12# (at your option) any later version. 

13# 

14# Hermes is distributed in the hope that it will be useful, 

15# but WITHOUT ANY WARRANTY; without even the implied warranty of 

16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

17# GNU General Public License for more details. 

18# 

19# You should have received a copy of the GNU General Public License 

20# along with Hermes. If not, see <https://www.gnu.org/licenses/>. 

21 

22 

23from lib.config import HermesConfig 

24 

25from copy import deepcopy 

26from datetime import datetime 

27from jinja2 import StrictUndefined 

28from jinja2.environment import Template 

29from typing import Any 

30 

31from clients.errorqueue import ErrorQueue 

32from lib.datamodel.dataobject import DataObject 

33from lib.datamodel.dataobjectlist import DataObjectList 

34from lib.datamodel.dataschema import Dataschema 

35from lib.datamodel.datasource import Datasource 

36from lib.datamodel.diffobject import DiffObject 

37from lib.datamodel.event import Event 

38from lib.datamodel.serialization import LocalCache 

39from lib.datamodel.jinja import ( 

40 HermesNativeEnvironment, 

41 Jinja, 

42 HermesUnknownVarsInJinjaTemplateError, 

43) 

44 

45 

46class InvalidDataError(Exception): 

47 """Raised when a case that should never happen occurs (a critical bug)""" 

48 

49 

50class Datamodel: 

51 """Load and build the Datamodel from config, and validates it according to remote 

52 Dataschema. 

53 

54 In charge of: 

55 - handling updates of Datamodel (hermes-client.datamodel changes in config file) 

56 - handling updates of remote Dataschema (hermes-server.datamodel in server 

57 config file) 

58 - converting a remote Event to a local one 

59 - handling remote and local data caches (remotedata and localdata attributes, 

60 each of Datasource type) 

61 """ 

62 

63 def __init__( 

64 self, 

65 config: HermesConfig, 

66 ): 

67 """Build the datamodel from config""" 

68 

69 self.unknownRemoteTypes: set[str] = set() 

70 """List remote types set in client Datamodel, but missing in remote 

71 Dataschema""" 

72 self.unknownRemoteAttributes: dict[str, set[str]] = set() 

73 """List remote attributes set in client Datamodel, but missing in remote 

74 Dataschema. The dict key contains the remote type, the set contains the missing 

75 attributes""" 

76 

77 self._config: HermesConfig = config 

78 

79 self._rawdatamodel: dict[str, Any] = self._config["hermes-client"]["datamodel"] 

80 """Local datamodel dictionary, as found in config""" 

81 

82 self._datamodel: dict[str, Any] 

83 """Local datamodel dictionary, with compiled Jinja templates""" 

84 

85 self._jinjaenv: HermesNativeEnvironment = HermesNativeEnvironment( 

86 undefined=StrictUndefined 

87 ) 

88 if "hermes" in self._config: 

89 self._jinjaenv.filters |= self._config["hermes"]["plugins"]["attributes"][ 

90 "_jinjafilters" 

91 ] 

92 

93 self.remote_schema: Dataschema = Dataschema.loadcachefile("_dataschema") 

94 """Remote schema""" 

95 self.local_schema: Dataschema | None = None 

96 """Local schema""" 

97 

98 self.remotedata: Datasource | None = None 

99 """Datasource of remote objects""" 

100 self.localdata: Datasource | None = None 

101 """Datasource of local objects""" 

102 

103 self.remotedata_complete: Datasource | None = None 

104 """Datasource of remote objects as it should be without error""" 

105 self.localdata_complete: Datasource | None = None 

106 """Datasource of local objects as it should be without error""" 

107 

108 self.errorqueue: ErrorQueue | None = None 

109 """Queue of Events in error""" 

110 

111 self.typesmapping: dict[str, str] 

112 """Mapping of datamodel types: hermes-server type as key, hermes-client type as 

113 value""" 

114 self._remote2local: dict[str, dict[str, list[str]]] 

115 """Mapping with remote type name as key, and dict containing remote attrname as 

116 key and local attrname as value. Example: 

117 { 

118 remote_type_name: { 

119 remote_attrname1: client_attrname1, 

120 ... 

121 }, 

122 ... 

123 } 

124 """ 

125 

126 if self.hasRemoteSchema(): 

127 self._mergeWithSchema(self.remote_schema) 

128 

129 def hasRemoteSchema(self) -> bool: 

130 """Returns true if remote schema has data""" 

131 return len(self.remote_schema.schema) != 0 

132 

133 def diffFrom(self, other: "Datamodel") -> DiffObject: 

134 """Return DiffObject with differences (attributes names) of current instance 

135 from another""" 

136 diff = DiffObject() 

137 

138 s = self._rawdatamodel.keys() 

139 o = other._rawdatamodel.keys() 

140 commonattrs = s & o 

141 

142 diff.appendRemoved(o - s) 

143 diff.appendAdded(s - o) 

144 

145 for k, v in self._rawdatamodel.items(): 

146 if k in commonattrs and DataObject.isDifferent(v, other._rawdatamodel[k]): 

147 diff.appendModified(k) 

148 

149 return diff 

150 

151 def loadLocalData(self): 

152 """Load or reload localdata and localdata_complete from cache""" 

153 self.localdata = Datasource( 

154 schema=self.local_schema, enableTrashbin=True, cacheFilePrefix="__" 

155 ) 

156 self.localdata.loadFromCache() 

157 self.localdata_complete = Datasource( 

158 schema=self.local_schema, 

159 enableTrashbin=True, 

160 cacheFilePrefix="__", 

161 cacheFileSuffix="_complete__", 

162 ) 

163 self.localdata_complete.loadFromCache() 

164 

165 def saveLocalData(self): 

166 """Save localdata and localdata_complete when they're set""" 

167 if self.localdata is not None: 

168 self.localdata.save() 

169 if self.localdata_complete is not None: 

170 self.localdata_complete.save() 

171 

172 def loadRemoteData(self): 

173 """Load or reload remotedata and remotedata_complete from cache""" 

174 self.remotedata = Datasource(schema=self.remote_schema, enableTrashbin=True) 

175 self.remotedata.loadFromCache() 

176 self.remotedata_complete = Datasource( 

177 schema=self.remote_schema, 

178 enableTrashbin=True, 

179 cacheFileSuffix="_complete__", 

180 ) 

181 self.remotedata_complete.loadFromCache() 

182 if self.errorqueue is not None: 

183 self.errorqueue.updateDatasources( 

184 self.remotedata, 

185 self.remotedata_complete, 

186 self.localdata, 

187 self.localdata_complete, 

188 ) 

189 

190 def saveRemoteData(self): 

191 """Save remotedata and remotedata_complete when they're set""" 

192 if self.remotedata is not None: 

193 self.remotedata.save() 

194 if self.remotedata_complete is not None: 

195 self.remotedata_complete.save() 

196 

197 def loadLocalAndRemoteData(self): 

198 """Load or reload localdata, localdata_complete, remotedata and 

199 remotedata_complete from cache""" 

200 self.loadLocalData() 

201 self.loadRemoteData() 

202 

203 def saveLocalAndRemoteData(self): 

204 """Save localdata, localdata_complete, remotedata and remotedata_complete 

205 from cache when they're set""" 

206 self.saveLocalData() 

207 self.saveRemoteData() 

208 

209 def loadErrorQueue(self): 

210 """Load or reload error queue from cache""" 

211 if self.hasRemoteSchema(): 

212 self.errorqueue = ErrorQueue.loadcachefile( 

213 "_errorqueue", 

214 typesMapping=self.typesmapping, 

215 remotedata_complete=self.remotedata_complete, 

216 remotedata=self.remotedata, 

217 localdata=self.localdata, 

218 localdata_complete=self.localdata_complete, 

219 autoremediate=self._config["hermes-client"]["autoremediation"], 

220 ) 

221 else: 

222 self.errorqueue = None 

223 

224 def saveErrorQueue(self): 

225 """Save error queue to cache""" 

226 if self.errorqueue is not None: 

227 self.errorqueue.savecachefile() 

228 

229 def _mergeWithSchema(self, remote_schema: Dataschema): 

230 """Build or update the datamodel according to specified remote_schema""" 

231 prev_remote_schema = self.remote_schema 

232 self.remote_schema = remote_schema 

233 self._remote2local = {} 

234 

235 prev_remote_pkeys, new_remote_pkeys = self._checkForSchemaChanges( 

236 prev_remote_schema, self.remote_schema 

237 ) 

238 

239 self._fillDatamodelDict() # Filled upon config only 

240 self._fillConversionVars() # Filled upon config only 

241 

242 self.local_schema = self._setupLocalSchema() 

243 

244 # Update pkeys when necessary 

245 if new_remote_pkeys: 

246 __hermes__.logger.info("Updating local cache primary keys") 

247 

248 self.saveLocalAndRemoteData() 

249 self.saveErrorQueue() 

250 

251 new_local_pkeys: dict[str, str | tuple[str]] = {} 

252 l_pkeys_to_add: dict[str, set[str]] = {} 

253 l_pkeys_to_remove: dict[str, set[str]] = {} 

254 local_types: dict[str, set[type[DataObject]]] = {} 

255 for r_objtype in new_remote_pkeys.keys(): 

256 if r_objtype not in self.typesmapping: 

257 continue 

258 

259 # Determine local objtype and its new primary key 

260 l_objtype = self.typesmapping[r_objtype] 

261 new_local_pkeys[l_objtype] = self.local_schema.schema[l_objtype][ 

262 "PRIMARYKEY_ATTRIBUTE" 

263 ] 

264 

265 # Compute local pkeys to add and to remove for each local data type 

266 r_prev_pkeys = prev_remote_pkeys[r_objtype] 

267 r_new_pkeys = new_remote_pkeys[r_objtype] 

268 if type(r_prev_pkeys) is str: 

269 r_prev_pkeys = (r_prev_pkeys,) 

270 if type(r_new_pkeys) is str: 

271 r_new_pkeys = (r_new_pkeys,) 

272 l_prev_pkeys = set([f"_pkey_{pkey}" for pkey in r_prev_pkeys]) 

273 l_new_pkeys = set([f"_pkey_{pkey}" for pkey in r_new_pkeys]) 

274 l_pkeys_to_add[l_objtype] = l_new_pkeys - l_prev_pkeys 

275 l_pkeys_to_remove[l_objtype] = l_prev_pkeys - l_new_pkeys 

276 

277 local_types[l_objtype] = set() 

278 

279 # Add new pkey attributes and values to localdata objects 

280 for src in (self.localdata, self.localdata_complete): 

281 for type_prefix in ("", "trashbin_"): 

282 obj: DataObject 

283 for obj in src[f"{type_prefix}{l_objtype}"]: 

284 if type(obj) not in local_types[l_objtype]: 

285 local_types[l_objtype].add(type(obj)) 

286 type(obj).HERMES_ATTRIBUTES |= l_pkeys_to_add[l_objtype] 

287 # Get corresponding remote object from cache 

288 (_, r_obj) = Datamodel.getObjectFromCacheOrTrashbin( 

289 self.remotedata_complete, r_objtype, obj.getPKey() 

290 ) 

291 if r_obj is None: 

292 # Should never happen : if so, it's a bug 

293 msg = ( 

294 f"BUG ! No matching of local object {repr(obj)}" 

295 " found in remotedata_complete cache. The client is" 

296 " probably broken" 

297 ) 

298 __hermes__.logger.critical(msg) 

299 raise InvalidDataError(msg) 

300 

301 for pkey in r_new_pkeys: 

302 try: 

303 # Get pkey value from remote object 

304 value = getattr(r_obj, pkey) 

305 except AttributeError: 

306 # Should never happen : if so, it's a bug 

307 msg = ( 

308 "BUG ! No value exist in remote cache for" 

309 f" attribute '{pkey}' of object {r_obj}. The" 

310 " client is probably broken" 

311 ) 

312 __hermes__.logger.critical(msg) 

313 raise InvalidDataError(msg) 

314 # Store pkey value to local object 

315 setattr(obj, f"_pkey_{pkey}", value) 

316 

317 # Update PRIMARYKEY_ATTRIBUTE of each local type 

318 for l_objtype, l_types in local_types.items(): 

319 for l_type in l_types: 

320 l_type.PRIMARYKEY_ATTRIBUTE = new_local_pkeys[l_objtype] 

321 

322 # Remove previous pkey attributes that are not used anymore from 

323 # localdata objects 

324 for src in (self.localdata, self.localdata_complete): 

325 for type_prefix in ("", "trashbin_"): 

326 obj: DataObject 

327 for obj in src[f"{type_prefix}{l_objtype}"]: 

328 for pkey in l_pkeys_to_remove[l_objtype]: 

329 try: 

330 delattr(obj, pkey) 

331 except AttributeError: 

332 pass 

333 

334 # Remove previous pkey attributes that are not used anymore from 

335 # HERMES_ATTRIBUTES of each local type 

336 for l_objtype, l_types in local_types.items(): 

337 for l_type in l_types: 

338 l_type.HERMES_ATTRIBUTES -= l_pkeys_to_remove[l_objtype] 

339 

340 self.saveLocalData() 

341 

342 __hermes__.logger.info("Updating changed primary keys in error queue") 

343 self.errorqueue.updatePrimaryKeys( 

344 new_remote_pkeys, 

345 self.remotedata, 

346 self.remotedata_complete, 

347 new_local_pkeys, 

348 self.localdata, 

349 self.localdata_complete, 

350 ) 

351 

352 # Save and reload error queue 

353 self.saveErrorQueue() 

354 self.loadErrorQueue() 

355 

356 # Load local and remote Datasource caches 

357 self.loadLocalAndRemoteData() 

358 

359 def updateSchema(self, remote_schema: Dataschema): 

360 """Build or update the datamodel according to specified remote_schema. 

361 Data caches (locadata, locadata_complete, remotedata and remotedata_complete) 

362 will be saved and reloaded to be updated according to new schema. 

363 Remote and local schemas caches will be saved. 

364 """ 

365 # Save current data before updating schema and reloading them 

366 self.saveLocalAndRemoteData() 

367 self._mergeWithSchema(remote_schema) 

368 self.remote_schema.savecachefile() 

369 

370 def forcePurgeOfTrashedObjectsWithoutNewPkeys( 

371 self, oldschema: Dataschema | None, newschema: Dataschema 

372 ) -> dict[str, set[Any]]: 

373 """On schema update, when primary key have changed, the trashed objects may not 

374 contain the value of the new primary key attribute(s). This function will 

375 change the trashbin timestamp of all those objects to force their removal. 

376 Returns True if a trashbin purge is required, False otherwise 

377 """ 

378 isTrashbinPurgeRequired: bool = False 

379 if oldschema is None: 

380 return False 

381 

382 diff = newschema.diffFrom(oldschema) 

383 

384 if not (diff and diff.modified): 

385 return False 

386 

387 old: dict[str, Any] = oldschema.schema 

388 new: dict[str, Any] = newschema.schema 

389 for objtype in diff.modified: 

390 if objtype not in self.typesmapping: 

391 continue 

392 npkey = new[objtype]["PRIMARYKEY_ATTRIBUTE"] 

393 opkey = old[objtype]["PRIMARYKEY_ATTRIBUTE"] 

394 if not DataObject.isDifferent(npkey, opkey): 

395 continue 

396 npkeys = (npkey,) if type(npkey) is str else npkey 

397 obj: DataObject 

398 for obj in self.remotedata[f"trashbin_{objtype}"]: 

399 for pkey in npkeys: 

400 if not hasattr(obj, pkey): 

401 __hermes__.logger.warning( 

402 f"Object {repr(obj)} of type '{objtype}' in trashbin will" 

403 " be purged, as it doesn't have the new primary key value" 

404 ) 

405 isTrashbinPurgeRequired = True 

406 obj._trashbin_timestamp = datetime(year=1, month=1, day=1) 

407 break 

408 return isTrashbinPurgeRequired 

409 

410 def _checkForSchemaChanges( 

411 self, oldschema: Dataschema | None, newschema: Dataschema 

412 ) -> tuple[dict[str, str | tuple[str]], dict[str, str | tuple[str]]]: 

413 """Returns a tuple of two dicts : 

414 - first dict with remote types as key, and previous remote primary key 

415 attribute as value 

416 - second dict with remote types as key, and new remote primary key attribute as 

417 value 

418 """ 

419 previouspkeys = {} 

420 newpkeys = {} 

421 if oldschema is None: 

422 return (previouspkeys, newpkeys) 

423 

424 diff = newschema.diffFrom(oldschema) 

425 

426 if diff: 

427 old: dict[str, Any] = oldschema.schema 

428 new: dict[str, Any] = newschema.schema 

429 

430 if diff.added: 

431 __hermes__.logger.info(f"Types added in Dataschema: {diff.added}") 

432 

433 if diff.removed: 

434 __hermes__.logger.info( 

435 f"Types removed from Dataschema: {diff.removed}," 

436 " purging cache files" 

437 ) 

438 self.purgeOldCacheFiles(diff.removed) 

439 

440 if diff.modified: 

441 for objtype in diff.modified: 

442 n = new[objtype] 

443 o = old[objtype] 

444 # HERMES_ATTRIBUTES 

445 added = n["HERMES_ATTRIBUTES"] - o["HERMES_ATTRIBUTES"] 

446 removed = o["HERMES_ATTRIBUTES"] - n["HERMES_ATTRIBUTES"] 

447 if added: 

448 __hermes__.logger.info( 

449 f"New attributes in dataschema type '{objtype}': {added}" 

450 ) 

451 if removed: 

452 __hermes__.logger.info( 

453 f"Removed attributes from dataschema type '{objtype}':" 

454 f" {removed}" 

455 ) 

456 

457 # SECRETS_ATTRIBUTES 

458 added = n["SECRETS_ATTRIBUTES"] - o["SECRETS_ATTRIBUTES"] 

459 removed = o["SECRETS_ATTRIBUTES"] - n["SECRETS_ATTRIBUTES"] 

460 if added: 

461 __hermes__.logger.info( 

462 f"New secrets attributes in dataschema type '{objtype}':" 

463 f" {added}" 

464 ) 

465 # We need to purge attribute from cache: as cache is loaded with 

466 # attribute set up as SECRET, we just have to save the cache 

467 # (attr won't be saved anymore, as it's SECRET) and reload 

468 # cache to "forget" values loaded from previous cache 

469 self.saveRemoteData() 

470 self.loadRemoteData() 

471 if removed: 

472 __hermes__.logger.info( 

473 "Removed secrets attributes from dataschema type" 

474 f" '{objtype}': {removed}" 

475 ) 

476 

477 # PRIMARYKEY_ATTRIBUTE 

478 npkey = n["PRIMARYKEY_ATTRIBUTE"] 

479 opkey = o["PRIMARYKEY_ATTRIBUTE"] 

480 if DataObject.isDifferent(npkey, opkey): 

481 previouspkeys[objtype] = opkey 

482 newpkeys[objtype] = npkey 

483 __hermes__.logger.info( 

484 "New primary key attribute in dataschema type" 

485 f" '{objtype}': {npkey}" 

486 ) 

487 

488 return (previouspkeys, newpkeys) 

489 

490 def convertEventToLocal( 

491 self, 

492 event: Event, 

493 new_obj: DataObject | None = None, 

494 allowEmptyEvent: bool = False, 

495 ) -> Event | None: 

496 """Convert specified remote event to local one. 

497 If new_obj is provided, it must contains all the new remote object values, 

498 and will only be used to render Jinja Templates. 

499 Returns None if local event doesn't contains any attribute and allowEmptyEvent 

500 is False""" 

501 if event.objtype not in self.typesmapping: 

502 __hermes__.logger.debug( 

503 f"Unknown {event.objtype=}. Known are {self.typesmapping}" 

504 ) 

505 return None # Unknown type 

506 

507 objtype = self.typesmapping[event.objtype] 

508 

509 # Handle that event.objattrs is 1 depth deeper for "modified" events 

510 if event.eventtype == "modified": 

511 sources = ("added", "modified", "removed") 

512 objattrs = {"added": {}, "modified": {}, "removed": {}} 

513 else: 

514 sources = (None,) 

515 objattrs = {} 

516 

517 hasContent: bool = False 

518 for source in sources: 

519 if source is None: 

520 src = event.objattrs 

521 else: 

522 src = event.objattrs[source] 

523 

524 # Hack to handle Jinja templates containing only static data 

525 if None in self._remote2local[event.objtype] and event.eventtype == "added": 

526 loopsrc = src.copy() 

527 loopsrc[None] = None 

528 else: 

529 loopsrc = src 

530 

531 for k, v in loopsrc.items(): 

532 if k in self._remote2local[event.objtype]: 

533 for dest in self._remote2local[event.objtype][k]: 

534 remoteattr = self._datamodel[objtype]["attrsmapping"][dest] 

535 if isinstance( 

536 remoteattr, Template 

537 ): # May be a compiled Jinja Template 

538 if new_obj is None: 

539 val = remoteattr.render(src) 

540 else: 

541 # We must provide all new object vars values to 

542 # render a Jinja Template computed from several vars, 

543 # in case of "modified" event changing the value of 

544 # only one var value used by the template. 

545 # The event objattrs won't be enough in this specific 

546 # case. 

547 val = remoteattr.render(new_obj.toNative()) 

548 

549 if type(val) is list: 

550 val = [v for v in val if v is not None] 

551 

552 if val is None or val == []: 

553 # No value 

554 if event.eventtype == "modified": 

555 objattrs["removed"].update({dest: val}) 

556 elif event.eventtype == "removed": 

557 objattrs.update({dest: val}) 

558 else: 

559 # In modified events, we have to determine if the 

560 # attribute is added or modified 

561 if event.eventtype == "modified": 

562 _, cachedObj = self.getObjectFromCacheOrTrashbin( 

563 self.localdata_complete, 

564 self.typesmapping[event.objtype], 

565 event.objpkey, 

566 ) 

567 

568 if cachedObj is not None and hasattr( 

569 cachedObj, dest 

570 ): 

571 # Ensure the value has changed 

572 previousVal = getattr(cachedObj, dest) 

573 if DataObject.isDifferent(previousVal, val): 

574 objattrs["modified"].update({dest: val}) 

575 hasContent = True 

576 else: 

577 # Attr is added 

578 objattrs["added"].update({dest: val}) 

579 hasContent = True 

580 else: 

581 objattrs.update({dest: val}) 

582 hasContent = True 

583 else: 

584 if source is None: 

585 objattrs.update({dest: v}) 

586 else: 

587 objattrs[source].update({dest: v}) 

588 hasContent = True 

589 

590 res = None 

591 if hasContent or allowEmptyEvent or event.eventtype == "removed": 

592 res = Event( 

593 evcategory=event.evcategory, 

594 eventtype=event.eventtype, 

595 objattrs=objattrs, 

596 ) 

597 res.objtype = objtype 

598 res.objpkey = event.objpkey 

599 res.objrepr = str(res.objpkey) 

600 res.timestamp = event.timestamp 

601 res.step = event.step 

602 res.isPartiallyProcessed = event.isPartiallyProcessed 

603 return res 

604 

605 def createLocalDataobject( 

606 self, objtype: str, objattrs: dict[str:Any] 

607 ) -> DataObject: 

608 """Returns instance of specified local Dataobject type from specified 

609 attributes""" 

610 return self.createDataobject(self.local_schema, objtype, objattrs) 

611 

612 def createRemoteDataobject( 

613 self, objtype: str, objattrs: dict[str:Any] 

614 ) -> DataObject: 

615 """Returns instance of specified remote Dataobject type from specified 

616 attributes""" 

617 return self.createDataobject(self.remote_schema, objtype, objattrs) 

618 

619 @staticmethod 

620 def createDataobject( 

621 schema: Dataschema, objtype: str, objattrs: dict[str:Any] 

622 ) -> DataObject: 

623 """Returns instance of specified Dataobject type from specified attributes""" 

624 return schema.objectTypes[objtype](from_json_dict=objattrs) 

625 

626 @staticmethod 

627 def getUpdatedObject(obj: DataObject, objattrs: dict[str, Any]) -> DataObject: 

628 """Return a deepcopy of specified obj, with its attributes updated upon 

629 specified objattrs dict from Event""" 

630 newobj = deepcopy(obj) 

631 

632 # Update newobj attributes 

633 for attrname, value in objattrs["added"].items(): 

634 setattr(newobj, attrname, value) # Add new attributes 

635 for attrname, value in objattrs["modified"].items(): 

636 setattr(newobj, attrname, value) # Update existing attributes 

637 for attrname, value in objattrs["removed"].items(): 

638 if hasattr(newobj, attrname): 

639 delattr(newobj, attrname) # Delete existing attributes 

640 

641 return newobj 

642 

643 def convertDataObjectToLocal(self, obj: DataObject) -> DataObject: 

644 """Convert specified Dataobject (remote) to local one""" 

645 tmpEvent = self.convertEventToLocal( 

646 Event("conversion", "added", obj, obj.toNative()) 

647 ) 

648 return self.local_schema.objectTypes[self.typesmapping[obj.getType()]]( 

649 from_json_dict=tmpEvent.objattrs 

650 ) 

651 

652 def convertDataObjectListToLocal( 

653 self, remoteobjtype: str, objlist: DataObjectList 

654 ) -> DataObjectList: 

655 """Convert specified DataObjectList (remote) to local one""" 

656 localobjs = [self.convertDataObjectToLocal(obj) for obj in objlist] 

657 return self.local_schema.objectlistTypes[self.typesmapping[remoteobjtype]]( 

658 objlist=localobjs 

659 ) 

660 

661 @staticmethod 

662 def purgeOldCacheFiles( 

663 objtypes: list[str] | set[str], 

664 cacheFilePrefix: str = "", 

665 cacheFileSuffix: str = "", 

666 ): 

667 """ "Delete all cache files of specified objtypes""" 

668 for objtype in objtypes: 

669 LocalCache.deleteAllCacheFiles( 

670 f"{cacheFilePrefix}{objtype}{cacheFileSuffix}" 

671 ) 

672 LocalCache.deleteAllCacheFiles( 

673 f"{cacheFilePrefix}{objtype}_complete__{cacheFileSuffix}" 

674 ) 

675 LocalCache.deleteAllCacheFiles( 

676 f"{cacheFilePrefix}trashbin_{objtype}{cacheFileSuffix}" 

677 ) 

678 LocalCache.deleteAllCacheFiles( 

679 f"{cacheFilePrefix}trashbin_{objtype}_complete__{cacheFileSuffix}" 

680 ) 

681 

682 def _fillDatamodelDict(self): 

683 # Fill the datamodel dict 

684 self._datamodel = {} 

685 for objtype in self._config["hermes-client"]["datamodel"]: 

686 self._datamodel[objtype] = {} 

687 for k, v in self._config["hermes-client"]["datamodel"][objtype].items(): 

688 if k == "attrsmapping": 

689 self._datamodel[objtype][k] = dict(v) 

690 elif k == "toString" and v is not None: 

691 # Compile toString's jinja template 

692 jinjavars = set() 

693 self._datamodel[objtype][k] = Jinja.compileIfJinjaTemplate( 

694 v, 

695 jinjavars, 

696 self._jinjaenv, 

697 f"hermes-client.datamodel.{objtype}.toString", 

698 False, 

699 False, 

700 ) 

701 # Ensure jinja vars are known local attrs 

702 unknownattrs = ( 

703 jinjavars 

704 - self._config["hermes-client"]["datamodel"][objtype][ 

705 "attrsmapping" 

706 ].keys() 

707 ) 

708 if unknownattrs: 

709 raise HermesUnknownVarsInJinjaTemplateError( 

710 "Unknown attributes met in 'hermes-client.datamodel" 

711 f".{objtype}.toString' jinja template: {unknownattrs}" 

712 ) 

713 else: 

714 self._datamodel[objtype][k] = v 

715 

716 def _fillConversionVars(self): 

717 # Fill the types mapping (remote as key, local as value) 

718 typesmapping = {v["hermesType"]: k for k, v in self._datamodel.items()} 

719 

720 # Types consistency check 

721 self.unknownRemoteTypes = typesmapping.keys() - self.remote_schema.schema.keys() 

722 if self.unknownRemoteTypes: 

723 for t in self.unknownRemoteTypes: 

724 del typesmapping[t] 

725 

726 # Reorder typemapping to respect the order specified on remote schema 

727 self.typesmapping = {} 

728 for rtype in self.remote_schema.schema: 

729 if rtype in typesmapping: 

730 self.typesmapping[rtype] = typesmapping[rtype] 

731 

732 # Fill the remote2local mapping dict 

733 for objsettings in self._datamodel.values(): 

734 remote_objtype = objsettings["hermesType"] 

735 self._remote2local[remote_objtype] = {} 

736 # Add primary keys to mapping to ensure they're always available 

737 if remote_objtype in self.remote_schema.schema: 

738 pkeys = self.remote_schema.schema[remote_objtype][ 

739 "PRIMARYKEY_ATTRIBUTE" 

740 ] 

741 if type(pkeys) not in [list, tuple]: 

742 pkeys = [pkeys] 

743 for pkey in pkeys: 

744 objsettings["attrsmapping"][f"_pkey_{pkey}"] = pkey 

745 

746 for local_attr, remote_attr in objsettings["attrsmapping"].items(): 

747 remote_vars = set() 

748 objsettings["attrsmapping"][local_attr] = Jinja.compileIfJinjaTemplate( 

749 remote_attr, 

750 remote_vars, 

751 self._jinjaenv, 

752 f"hermes-client.datamodel.{remote_objtype}.attrsmapping", 

753 False, 

754 False, 

755 ) 

756 if len(remote_vars) == 0: 

757 # Hack to handle Jinja templates containing only static data 

758 remote_vars.add(None) 

759 

760 for remote_var in remote_vars: 

761 # As many local attrs can be mapped on a same remote attr, 

762 # store the mapping in a list 

763 self._remote2local[remote_objtype].setdefault( 

764 remote_var, [] 

765 ).append(local_attr) 

766 

767 # Attributes consistency check 

768 self.unknownRemoteAttributes = {} 

769 for rtype in self.typesmapping: 

770 diff = ( 

771 self._remote2local[rtype].keys() 

772 - self.remote_schema.schema[rtype]["HERMES_ATTRIBUTES"] 

773 - set([None]) # Ignore Jinja templates with static data only 

774 ) 

775 if diff: 

776 self.unknownRemoteAttributes[rtype] = diff 

777 

778 def _setupLocalSchema(self) -> Dataschema: 

779 """Build local schema from local datamodel and remote schema""" 

780 rschema: dict[str, Any] = self.remote_schema.schema 

781 schema: dict[str, Any] = {} 

782 for objtype in self.typesmapping.values(): 

783 remote_objtype = self._datamodel[objtype]["hermesType"] 

784 

785 secrets = [] 

786 for attr in rschema[remote_objtype]["SECRETS_ATTRIBUTES"]: 

787 v = self._remote2local[remote_objtype].get(attr) 

788 if v is not None: 

789 secrets.extend(v) 

790 

791 # Add primary keys to mapping to ensure they're always there 

792 pkeys = self.remote_schema.schema[remote_objtype]["PRIMARYKEY_ATTRIBUTE"] 

793 if type(pkeys) in [list, tuple]: 

794 pkey = tuple([f"_pkey_{pkey}" for pkey in pkeys]) 

795 else: 

796 pkey = f"_pkey_{pkeys}" 

797 

798 # Convert foreign keys dict 

799 fkeys: dict[str, list[str]] = {} 

800 for from_attr, (to_obj, to_attr) in self.remote_schema.schema[ 

801 remote_objtype 

802 ]["FOREIGN_KEYS"].items(): 

803 # In current implementation, foreign key are always 

804 # single primary keys (not a tuple) 

805 fkeys[f"_pkey_{from_attr}"] = [ 

806 self.typesmapping[to_obj], 

807 f"_pkey_{to_attr}", 

808 ] 

809 

810 schema[objtype] = { 

811 "HERMES_ATTRIBUTES": set( 

812 self._datamodel[objtype]["attrsmapping"].keys() 

813 ), 

814 "SECRETS_ATTRIBUTES": set(secrets), 

815 "CACHEONLY_ATTRIBUTES": set(), 

816 "LOCAL_ATTRIBUTES": set(), 

817 "PRIMARYKEY_ATTRIBUTE": pkey, 

818 "FOREIGN_KEYS": fkeys, 

819 "TOSTRING": self._datamodel[objtype]["toString"], 

820 } 

821 

822 res = Dataschema(schema) 

823 return res 

824 

825 @staticmethod 

826 def getObjectFromCacheOrTrashbin( 

827 ds: Datasource, objtype: str, objpkey: Any 

828 ) -> tuple[DataObjectList | None, DataObject | None]: 

829 """Look for objpkey in maincache and trashbin of objtype in specified ds. 

830 If found, returns a tuple with a the DataObjectList where the object is, and the 

831 object itself. Otherwise, returns (None, None)""" 

832 src: DataObjectList 

833 obj: DataObject | None 

834 for src in (ds[objtype], ds[f"trashbin_{objtype}"]): 

835 obj = src.get(objpkey) 

836 if obj is not None: 

837 return (src, obj) 

838 return (None, None)