Coverage for server/datamodel.py: 77%

302 statements  

« prev     ^ index     » next       coverage.py v7.10.1, created at 2025-07-28 07:25 +0000

1#!/usr/bin/env python3 

2# -*- coding: utf-8 -*- 

3 

4# Hermes : Change Data Capture (CDC) tool from any source(s) to any target 

5# Copyright (C) 2023, 2024 INSA Strasbourg 

6# 

7# This file is part of Hermes. 

8# 

9# Hermes is free software: you can redistribute it and/or modify 

10# it under the terms of the GNU General Public License as published by 

11# the Free Software Foundation, either version 3 of the License, or 

12# (at your option) any later version. 

13# 

14# Hermes is distributed in the hope that it will be useful, 

15# but WITHOUT ANY WARRANTY; without even the implied warranty of 

16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

17# GNU General Public License for more details. 

18# 

19# You should have received a copy of the GNU General Public License 

20# along with Hermes. If not, see <https://www.gnu.org/licenses/>. 

21 

22 

23from typing import Any, Hashable, TYPE_CHECKING 

24 

25if TYPE_CHECKING: # pragma: no cover 

26 # Only for type hints, won't import at runtime 

27 from lib.config import HermesConfig 

28 

29from typing import Callable 

30 

31from jinja2 import StrictUndefined 

32from jinja2.environment import Template 

33import time 

34 

35from lib.datamodel.dataschema import Dataschema 

36from lib.datamodel.dataobject import DataObject 

37from lib.datamodel.dataobjectlist import DataObjectList 

38from lib.datamodel.jinja import ( 

39 HermesNativeEnvironment, 

40 Jinja, 

41 HermesUnknownVarsInJinjaTemplateError, 

42) 

43from lib.plugins import AbstractDataSourcePlugin 

44from lib.datamodel.datasource import Datasource 

45 

46 

47class HermesDataModelMissingPrimarykeyError(Exception): 

48 """Raised when the primarykey is missing from the attrsmapping of a source in 

49 datamodel""" 

50 

51 

52class HermesInvalidPrimarykeyTypeError(Exception): 

53 """Raised when the primarykey is missing from the attrsmapping of a source in 

54 datamodel""" 

55 

56 

57class HermesDataModelInvalidQueryTypeError(Exception): 

58 """Raised when _runQuery() is called with an invalid querytype""" 

59 

60 

61class DatamodelFragment: 

62 """Handle settings, data and access to remote source data of one datamodel type for 

63 one source. 

64 

65 The data from several DatamodelFragment will then be consolidated and merged in and 

66 by Datamodel. 

67 """ 

68 

69 HERMES_RESERVED_JINJA_VARS = set( 

70 [ 

71 "_SELF", 

72 "REMOTE_ATTRIBUTES", 

73 "ITEM_CACHED_VALUES", 

74 "ITEM_FETCHED_VALUES", 

75 "CACHED_VALUES", 

76 "FETCHED_VALUES", 

77 ] 

78 ) 

79 """Jinja variables names reserved for internal use""" 

80 

81 def __init__( 

82 self, 

83 dataobjtype: str, 

84 datasourcename: str, 

85 fragmentSettings: dict[str, str | dict], 

86 primarykeyattr: str | tuple[str], 

87 datasourceplugin: AbstractDataSourcePlugin, 

88 attributesplugins: dict[str, Callable[..., Any]], 

89 ): 

90 """Create a new DatamodelFragment of specified dataobjtype, from specified 

91 datasourcename. 

92 To allow data to be fetched/commited properly, fragmentSettings (source 

93 settings) must be specified, as primarykeyattr, datasourceplugin and 

94 attributesplugins""" 

95 self._dataobjects: list[DataObject] = [] 

96 self._datasourceplugin: AbstractDataSourcePlugin = datasourceplugin 

97 self._errorcontext: str = ( 

98 f"hermes-server.datamodel.{dataobjtype}.{datasourcename}.attrsmapping" 

99 ) 

100 self._jinjaenv: HermesNativeEnvironment = HermesNativeEnvironment( 

101 undefined=StrictUndefined 

102 ) 

103 self._jinjaenv.filters |= attributesplugins 

104 self.datasourcename: str = datasourcename 

105 self._settings: dict[str, str | dict] = fragmentSettings 

106 self._compiledsettings = Jinja.compileIfJinjaTemplate( 

107 self._settings, None, self._jinjaenv, self._errorcontext, False, False 

108 ) 

109 self._dataobjclass: type[DataObject] = self.__createDataObjectsubclass( 

110 dataobjtype, datasourcename, primarykeyattr 

111 ) 

112 

113 self.jinjavars_vars: set[str] = set() 

114 """List of jinja vars used in fragment's fetch vars""" 

115 self.jinjavars_query: set[str] = set() 

116 """List of jinja vars used in fragment's query""" 

117 

118 # Fill self.jinjavars_vars 

119 _ = Jinja.compileIfJinjaTemplate( 

120 self._settings["fetch"].get("vars", {}), 

121 self.jinjavars_vars, 

122 self._jinjaenv, 

123 self._errorcontext, 

124 False, 

125 False, 

126 ) 

127 # Fill self.jinjavars_query 

128 _ = Jinja.compileIfJinjaTemplate( 

129 self._settings["fetch"].get("query", ""), 

130 self.jinjavars_query, 

131 self._jinjaenv, 

132 self._errorcontext, 

133 False, 

134 False, 

135 ) 

136 

137 def __createDataObjectsubclass( 

138 self, dataobjtype: str, datasourcename: str, primarykeyattr: str | tuple[str] 

139 ) -> type[DataObject]: 

140 """Dynamically create a new subclass of DataObject class, and set it up 

141 according to Datamodel""" 

142 newcls: type[DataObject] = Dataschema.createSubclass( 

143 f"{dataobjtype}_{datasourcename}", DataObject 

144 ) 

145 

146 newcls.PRIMARYKEY_ATTRIBUTE = primarykeyattr 

147 newcls.REMOTE_ATTRIBUTES = set() 

148 # This function will fill newcls.REMOTE_ATTRIBUTES 

149 newcls.HERMES_TO_REMOTE_MAPPING = Jinja.compileIfJinjaTemplate( 

150 self._settings["attrsmapping"], 

151 newcls.REMOTE_ATTRIBUTES, 

152 self._jinjaenv, 

153 self._errorcontext, 

154 True, 

155 False, 

156 excludeFlatVars=self.HERMES_RESERVED_JINJA_VARS, 

157 ) 

158 newcls.HERMES_ATTRIBUTES = set(self._settings["attrsmapping"].keys()) 

159 newcls.SECRETS_ATTRIBUTES = set(self._settings["secrets_attrs"]) 

160 newcls.CACHEONLY_ATTRIBUTES = set(self._settings["cacheonly_attrs"]) 

161 newcls.LOCAL_ATTRIBUTES = set(self._settings["local_attrs"]) 

162 

163 __hermes__.logger.debug( 

164 f"Created dynamic class:\n" 

165 f" {newcls.__name__}:\n" 

166 f" - {newcls.PRIMARYKEY_ATTRIBUTE=}\n" 

167 f" - {newcls.HERMES_ATTRIBUTES=}\n" 

168 f" - {newcls.REMOTE_ATTRIBUTES=}\n" 

169 f" - {newcls.HERMES_TO_REMOTE_MAPPING=}\n" 

170 f" - {newcls.SECRETS_ATTRIBUTES=}\n" 

171 f" - {newcls.CACHEONLY_ATTRIBUTES=}\n" 

172 f" - {newcls.LOCAL_ATTRIBUTES=}" 

173 ) 

174 return newcls 

175 

176 def getDataobjClass(self) -> type[DataObject]: 

177 """Return DataObject subclass of current fragment""" 

178 return self._dataobjclass 

179 

180 def fetch(self, cache: DataObjectList, globalcontext: dict[str, Any]): 

181 """Fetch data from current fragment source""" 

182 cached_values: list[dict[str, Any]] = cache.toNative() 

183 objcls = self.getDataobjClass() 

184 

185 context = globalcontext | { 

186 "REMOTE_ATTRIBUTES": objcls.REMOTE_ATTRIBUTES, 

187 "CACHED_VALUES": cached_values, 

188 } 

189 query = self._compiledsettings["fetch"]["query"] 

190 if isinstance(query, Template): 

191 query = query.render(context) 

192 

193 queryvars = Jinja.renderQueryVars( 

194 self._compiledsettings["fetch"]["vars"], context 

195 ) 

196 fetcheddata = self._runQuery(self._settings["fetch"]["type"], query, queryvars) 

197 

198 self._dataobjects = [] 

199 for objdata in fetcheddata: 

200 # As primary key may be a tuple, we'll have to render each value of tuple 

201 objpkeys = [] 

202 if type(objcls.PRIMARYKEY_ATTRIBUTE) is tuple: 

203 pkey_attrs = objcls.PRIMARYKEY_ATTRIBUTE 

204 else: 

205 pkey_attrs = (objcls.PRIMARYKEY_ATTRIBUTE,) 

206 

207 for pkey_attr in pkey_attrs: 

208 # Render pkey value 

209 remotepkeyattr = self._compiledsettings["attrsmapping"][pkey_attr] 

210 if isinstance(remotepkeyattr, Template): 

211 # Render from compiled Jinja Template 

212 objpkeys.append(remotepkeyattr.render(objdata)) 

213 else: 

214 # Raw value 

215 objpkeys.append(objdata.get(remotepkeyattr)) 

216 

217 # Ensure pkeys values are hashable 

218 unhashable = {} 

219 for i in range(len(pkey_attrs)): 

220 if not isinstance(objpkeys[i], Hashable): 

221 unhashable[pkey_attrs[i]] = objpkeys[i] 

222 

223 if unhashable: 

224 err = ( 

225 f"Invalid value type for primary key(s) : {unhashable}." 

226 " Primary keys must be 'hashable'" 

227 ) 

228 __hermes__.logger.critical(err) 

229 raise HermesInvalidPrimarykeyTypeError(err) 

230 

231 if type(objcls.PRIMARYKEY_ATTRIBUTE) is tuple: 

232 objpkey = tuple(objpkeys) 

233 else: 

234 objpkey = objpkeys[0] 

235 

236 item_cache = cache.get(objpkey) 

237 if item_cache: 

238 objcontext = {"ITEM_CACHED_VALUES": item_cache.toNative()} 

239 else: 

240 objcontext = {"ITEM_CACHED_VALUES": {}} 

241 

242 self._dataobjects.append( 

243 objcls(from_remote=objdata, jinjaContextVars=objcontext) 

244 ) 

245 

246 def commit_one( 

247 self, item_cached_values: dict[str, Any], item_fetched_values: dict[str, Any] 

248 ): 

249 """Commit that one object data changes have successfully sent to message bus""" 

250 if self._settings.get("commit_one") is None: 

251 return 

252 

253 objcls = self.getDataobjClass() 

254 

255 context = { 

256 "REMOTE_ATTRIBUTES": objcls.REMOTE_ATTRIBUTES, 

257 "ITEM_CACHED_VALUES": item_cached_values, 

258 "ITEM_FETCHED_VALUES": item_fetched_values, 

259 } 

260 query = self._compiledsettings["commit_one"].get("query", "") 

261 if isinstance(query, Template): 

262 query = query.render(context) 

263 

264 queryvars = Jinja.renderQueryVars( 

265 self._compiledsettings["commit_one"]["vars"], context 

266 ) 

267 self._runQuery(self._settings["commit_one"]["type"], query, queryvars) 

268 

269 def commit_all( 

270 self, cached_values: list[dict[str, Any]], fetched_values: list[dict[str, Any]] 

271 ): 

272 """Commit that all data fetched has successfully sent to message bus""" 

273 if self._settings.get("commit_all") is None: 

274 return 

275 

276 objcls = self.getDataobjClass() 

277 

278 context = { 

279 "REMOTE_ATTRIBUTES": objcls.REMOTE_ATTRIBUTES, 

280 "CACHED_VALUES": cached_values, 

281 "FETCHED_VALUES": fetched_values, 

282 } 

283 query = self._compiledsettings["commit_all"].get("query", "") 

284 if isinstance(query, Template): 

285 query = query.render(context) 

286 

287 queryvars = Jinja.renderQueryVars( 

288 self._compiledsettings["commit_all"]["vars"], context 

289 ) 

290 self._runQuery(self._settings["commit_all"]["type"], query, queryvars) 

291 

292 def _runQuery( 

293 self, querytype: str, query: str, queryvars: dict[str, Any] 

294 ) -> list[dict[str, Any]] | None: 

295 """Run specified query with specified quetyvars on datasource. 

296 querytype must be one of fetch, add, delete, modify. 

297 

298 Returns None when querytype isn't "fetch", 

299 otherwise returns a list of dict containing each entry fetched, with 

300 REMOTE_ATTRIBUTES as keys, and corresponding fetched values as values 

301 """ 

302 __hermes__.logger.debug( 

303 f"{self.getDataobjClass().__name__}:" 

304 f" _runQuery({querytype=}, {query=}, {queryvars=})" 

305 ) 

306 fetcheddata = None 

307 starttime = time.time() 

308 with self._datasourceplugin: 

309 if querytype == "fetch": 

310 fetcheddata = self._datasourceplugin.fetch(query, queryvars) 

311 elif querytype == "add": 

312 self._datasourceplugin.add(query, queryvars) 

313 elif querytype == "delete": 

314 self._datasourceplugin.delete(query, queryvars) 

315 elif querytype == "modify": 

316 self._datasourceplugin.modify(query, queryvars) 

317 else: 

318 raise HermesDataModelInvalidQueryTypeError( 

319 f"runQuery called with invalid querytype '{querytype}'" 

320 ) 

321 

322 elapsedms = int(round(1000 * (time.time() - starttime))) 

323 if fetcheddata is None: 

324 __hermes__.logger.debug( 

325 f"{self.getDataobjClass().__name__}:" 

326 f" _runQuery() returned in {elapsedms} ms" 

327 ) 

328 else: 

329 __hermes__.logger.debug( 

330 f"{self.getDataobjClass().__name__}:" 

331 f" _runQuery() returned {len(fetcheddata)} entries in {elapsedms} ms" 

332 ) 

333 

334 return fetcheddata 

335 

336 

337class Datamodel: 

338 """Load and build the Datamodel from config. 

339 

340 In charge of: 

341 - generating Dataschema 

342 - retrieving remote data from all sources, and merging it 

343 """ 

344 

345 def __init__(self, config: "HermesConfig"): 

346 """Build the datamodel from config""" 

347 self._fragments: dict[str, list[DatamodelFragment]] = { 

348 k: [] for k in config["hermes-server"]["datamodel"].keys() 

349 } 

350 

351 self._config: "HermesConfig" = config 

352 self._jinjaenv: HermesNativeEnvironment = HermesNativeEnvironment( 

353 undefined=StrictUndefined 

354 ) 

355 

356 # Fill the _fragments dictionary 

357 datamodel: dict[str, Any] = config["hermes-server"]["datamodel"] 

358 for objtype, fragmentslist in self._fragments.items(): 

359 for sourcename, sourcesettings in datamodel[objtype]["sources"].items(): 

360 pkeyattr = datamodel[objtype]["primarykeyattr"] 

361 if type(pkeyattr) is list: 

362 pkeyattr = tuple(pkeyattr) 

363 item = DatamodelFragment( 

364 objtype, 

365 sourcename, 

366 sourcesettings, 

367 pkeyattr, 

368 config["hermes"]["plugins"]["datasources"][sourcename][ 

369 "plugininstance" 

370 ], 

371 config["hermes"]["plugins"]["attributes"]["_jinjafilters"], 

372 ) 

373 fragmentslist.append(item) 

374 

375 # Consolidate _fragments data to set up the Dataschema 

376 self.dataschema: Dataschema = self.__setupSchema() 

377 """Current Dataschema""" 

378 

379 # Compile Jinja template of integrity_constraints, and store vars of 

380 # merge_constraints for "lazy" generation of template vars 

381 self._compileJinja() 

382 

383 # Load Datasource with cache 

384 self.data: Datasource = Datasource( 

385 schema=self.dataschema, enableTrashbin=False, enableCache=True 

386 ) 

387 

388 def __setupSchema(self) -> Dataschema: 

389 """Consolidate _fragments data to set up the Dataschema""" 

390 schema: dict[str, Any] = {} 

391 datamodel: dict[str, Any] = self._config["hermes-server"]["datamodel"] 

392 

393 for objtype in self._fragments.keys(): 

394 count: dict[str, int] = {} 

395 secrets_attrs = set() 

396 cacheonly_attrs = set() 

397 local_attrs = set() 

398 foreign_keys: dict[str, list[str]] = {} 

399 

400 # Save foreign keys, they'll be checked later 

401 for localkey, v in datamodel[objtype]["foreignkeys"].items(): 

402 foreign_keys[localkey] = [v["from_objtype"], v["from_attr"]] 

403 

404 # Ensure primarykey is in attrsmapping of each sources 

405 for fragment in self._fragments[objtype]: 

406 objcls = fragment.getDataobjClass() 

407 if objcls.SECRETS_ATTRIBUTES: 

408 secrets_attrs |= objcls.SECRETS_ATTRIBUTES 

409 if objcls.CACHEONLY_ATTRIBUTES: 

410 cacheonly_attrs |= objcls.CACHEONLY_ATTRIBUTES 

411 if objcls.LOCAL_ATTRIBUTES: 

412 local_attrs |= objcls.LOCAL_ATTRIBUTES 

413 for attr in objcls.HERMES_ATTRIBUTES: 

414 count[attr] = count[attr] + 1 if attr in count else 1 

415 pkey = self._fragments[objtype][0].getDataobjClass().PRIMARYKEY_ATTRIBUTE 

416 

417 if type(pkey) is tuple: 

418 for key in pkey: 

419 if count[key] != len(self._fragments[objtype]): 

420 raise HermesDataModelMissingPrimarykeyError( 

421 f"The primary key '{pkey}' must be fetched from each" 

422 " datasource" 

423 ) 

424 else: 

425 if count[pkey] != len(self._fragments[objtype]): 

426 raise HermesDataModelMissingPrimarykeyError( 

427 f"The primary key '{pkey}' must be fetched from each datasource" 

428 ) 

429 

430 # Compile toString jinja template 

431 jinjavars = set() 

432 tostringTpl = Jinja.compileIfJinjaTemplate( 

433 self._config["hermes-server"]["datamodel"][objtype]["toString"], 

434 jinjavars, 

435 self._jinjaenv, 

436 f"hermes-server.datamodel.{objtype}.toString", 

437 False, 

438 False, 

439 ) 

440 # Ensure jinja vars are known local attrs 

441 unknownattrs = jinjavars - set(count.keys()) 

442 if unknownattrs: 

443 raise HermesUnknownVarsInJinjaTemplateError( 

444 "Unknown attributes met in 'hermes-server.datamodel" 

445 f".{objtype}.toString' jinja template: {unknownattrs}" 

446 ) 

447 schema[objtype] = { 

448 "HERMES_ATTRIBUTES": set(count.keys()), 

449 "SECRETS_ATTRIBUTES": secrets_attrs, 

450 "CACHEONLY_ATTRIBUTES": cacheonly_attrs, 

451 "LOCAL_ATTRIBUTES": local_attrs, 

452 "PRIMARYKEY_ATTRIBUTE": pkey, 

453 "FOREIGN_KEYS": foreign_keys, 

454 "TOSTRING": tostringTpl, 

455 } 

456 

457 res = Dataschema(schema) 

458 return res 

459 

460 def _compileJinja(self): 

461 """Compile Jinja template of integrity_constraints, and store vars of 

462 merge_constraints for "lazy" generation of template vars""" 

463 for dataobjtype, settings in self._config["hermes-server"]["datamodel"].items(): 

464 # Fill merge_constraints_vars that will contain required vars by differents 

465 # merge_constraints specified in sources. This will allow lazy generation of 

466 # Jinja env vars 

467 settings["merge_constraints_vars"] = set() 

468 for srcname, srcsettings in settings["sources"].items(): 

469 Jinja.compileIfJinjaTemplate( 

470 var=srcsettings["merge_constraints"], 

471 flatvars_set=settings["merge_constraints_vars"], 

472 jinjaenv=self._jinjaenv, 

473 errorcontext=( 

474 f"hermes-server.datamodel.{dataobjtype}.sources.{srcname}" 

475 ".merge_constraints" 

476 ), 

477 allowOnlyOneTemplate=False, 

478 allowOnlyOneVar=False, 

479 ) 

480 

481 # Compile integrity_constraints templates and fill 

482 # integrity_constraints_vars that will contain required vars. 

483 # This willallow lazy generation of Jinja env vars 

484 settings["integrity_constraints_vars"] = set() 

485 settings["integrity_constraints"] = Jinja.compileIfJinjaTemplate( 

486 var=settings["integrity_constraints"], 

487 flatvars_set=settings["integrity_constraints_vars"], 

488 jinjaenv=self._jinjaenv, 

489 errorcontext=( 

490 f"hermes-server.datamodel.{dataobjtype}.integrity_constraints" 

491 ), 

492 allowOnlyOneTemplate=False, 

493 allowOnlyOneVar=False, 

494 ) 

495 

496 def fetch(self): 

497 """Fetch data from all sources, enforce merge and integrity constraints, and 

498 store merged data in 'ds' attribute""" 

499 fragment: DatamodelFragment 

500 # Load data starting in specific order to minimize inconsistencies if any 

501 # modifications are processed in the same time 

502 for objtype, objlistcls in self.dataschema.objectlistTypes.items(): 

503 settings = self._config["hermes-server"]["datamodel"][objtype] 

504 dontMergeOnConflict = settings["on_merge_conflict"] == "use_cached_entry" 

505 cache = self.data.cache[objtype] 

506 mergeFiltered: set[Any] = set() # pkeys 

507 

508 # Fetch data 

509 starttime = time.time() 

510 for fragment in self._fragments[objtype]: 

511 # Fill global context dict with objtype_pkeys and objtype if 

512 # requested in fragment's fetch "query" or "vars" 

513 context = {} 

514 alreadyProcessed = True 

515 for context_objtype in self.dataschema.objectlistTypes: 

516 if context_objtype == objtype: 

517 # Current and remaining context_objtype haven't been processed 

518 # yet 

519 alreadyProcessed = False 

520 

521 # Generate context only if required 

522 if ( 

523 context_objtype + "_pkeys" 

524 in fragment.jinjavars_vars | fragment.jinjavars_query 

525 ): 

526 if alreadyProcessed: 

527 context[context_objtype + "_pkeys"] = self.data[ 

528 context_objtype 

529 ].getPKeys() 

530 else: 

531 __hermes__.logger.warning( 

532 f"You're trying to use '{context_objtype}_pkeys' var" 

533 " of an objtype declared after the current one" 

534 f" ({objtype}) in datamodel, which has therefore not" 

535 " yet been processed. Will use an empty var." 

536 ) 

537 context[context_objtype + "_pkeys"] = set() 

538 if ( 

539 context_objtype 

540 in fragment.jinjavars_vars | fragment.jinjavars_query 

541 ): 

542 if alreadyProcessed: 

543 context[context_objtype] = self.data[ 

544 context_objtype 

545 ].toNative() 

546 else: 

547 __hermes__.logger.warning( 

548 f"You're trying to use '{context_objtype}' var of " 

549 "an objtype declared after the current one " 

550 f"({objtype}) in datamodel, which has therefore not" 

551 " yet been processed. Will use an empty var." 

552 ) 

553 context[context_objtype] = [] 

554 fragment.fetch(cache, context) # Fetch fragment data from remote source 

555 

556 elapsedms = int(round(1000 * (time.time() - starttime))) 

557 __hermes__.logger.debug( 

558 f"Fetched and converted all <{objtype}> data in {elapsedms} ms" 

559 ) 

560 

561 # Enforce merge constraints, if any 

562 if any( 

563 len(i._compiledsettings["merge_constraints"]) > 0 

564 for i in self._fragments[objtype] 

565 ): 

566 starttime = time.time() 

567 hasChanged = True 

568 # Loop until no change is made 

569 while hasChanged: 

570 hasChanged = False 

571 

572 # Fill vars dict 

573 vars = {} 

574 for fragment in self._fragments[objtype]: 

575 dataobjlist = objlistcls(objlist=fragment._dataobjects) 

576 # Generate vars only if required 

577 if ( 

578 fragment.datasourcename + "_pkeys" 

579 in settings["merge_constraints_vars"] 

580 ): 

581 vars[fragment.datasourcename + "_pkeys"] = ( 

582 dataobjlist.getPKeys() 

583 ) 

584 if ( 

585 fragment.datasourcename 

586 in settings["merge_constraints_vars"] 

587 ): 

588 vars[fragment.datasourcename] = dataobjlist.toNative() 

589 

590 # Apply constraints 

591 for fragment in self._fragments[objtype]: 

592 constraints = fragment._compiledsettings["merge_constraints"] 

593 if not constraints: 

594 continue 

595 toRemove = set() 

596 for obj in fragment._dataobjects: 

597 for constraint in constraints: 

598 # Generate _SELF var only if required 

599 if "_SELF" in settings["merge_constraints_vars"]: 

600 vars["_SELF"] = obj.toNative() 

601 if not constraint.render(vars): 

602 toRemove.add(obj) 

603 break 

604 if toRemove: 

605 hasChanged = True 

606 # __hermes__.logger.debug( 

607 # f"Merge constraints: filtering {len(toRemove)}" 

608 # f" item(s) from {fragment.datasourcename}" 

609 # ) 

610 for obj in toRemove: 

611 fragment._dataobjects.remove(obj) 

612 mergeFiltered.add(obj.getPKey()) 

613 

614 elapsedms = int(round(1000 * (time.time() - starttime))) 

615 __hermes__.logger.debug( 

616 f"Enforced <{objtype}> merge constraints in {elapsedms} ms:" 

617 f" filtered {len(mergeFiltered)} item(s)" 

618 ) 

619 

620 # Merge data 

621 starttime = time.time() 

622 objlist: DataObjectList = None 

623 for fragment in self._fragments[objtype]: 

624 if objlist is None: # First fragment 

625 objlist = objlistcls(objlist=fragment._dataobjects) 

626 else: # other fragments than first 

627 mergeFiltered |= objlist.mergeWith( 

628 objlist=fragment._dataobjects, 

629 pkeyMergeConstraint=fragment._settings["pkey_merge_constraint"], 

630 dontMergeOnConflict=dontMergeOnConflict, 

631 ) 

632 objlist.mergeFiltered |= mergeFiltered 

633 elapsedms = int(round(1000 * (time.time() - starttime))) 

634 __hermes__.logger.debug( 

635 f"Merged all <{objtype}> data in {elapsedms} ms: filtered" 

636 f" {len(mergeFiltered)} item(s)" 

637 ) 

638 

639 # Store merged data in current Datasource var 

640 self.data[objtype] = objlist 

641 

642 # Replace inconsistencies and merge conflicts by cache values 

643 self.data[objtype].replaceInconsistenciesByCachedValues( 

644 self.data.cache[objtype] 

645 ) 

646 

647 # Enforce integrity constraints, if any 

648 if any( 

649 len(self._config["hermes-server"]["datamodel"][t]["integrity_constraints"]) 

650 > 0 

651 for t in self.dataschema.objectlistTypes 

652 ): 

653 starttime = time.time() 

654 

655 # Store all integrity constraints vars 

656 all_integrity_vars = set() 

657 for objtype in self.dataschema.objectlistTypes: 

658 settings = self._config["hermes-server"]["datamodel"][objtype] 

659 all_integrity_vars.update(settings["integrity_constraints_vars"]) 

660 

661 hasChanged = True 

662 # Loop until no change is made 

663 while hasChanged: 

664 hasChanged = False 

665 

666 # Fill vars dict 

667 vars = {} 

668 for objtype in self.dataschema.objectlistTypes: 

669 settings = self._config["hermes-server"]["datamodel"][objtype] 

670 # Generate vars only if required 

671 if objtype + "_pkeys" in all_integrity_vars: 

672 vars[objtype + "_pkeys"] = self.data[objtype].getPKeys() 

673 if objtype in all_integrity_vars: 

674 vars[objtype] = self.data[objtype].toNative() 

675 

676 # Apply constraints 

677 for objtype in self.dataschema.objectlistTypes: 

678 settings = self._config["hermes-server"]["datamodel"][objtype] 

679 if not settings["integrity_constraints"]: 

680 continue 

681 

682 integrityFiltered = set() 

683 for obj in self.data[objtype]: 

684 for constraint in settings["integrity_constraints"]: 

685 # Generate _SELF var only if required 

686 if "_SELF" in settings["integrity_constraints_vars"]: 

687 vars["_SELF"] = obj.toNative() 

688 if not constraint.render(vars): 

689 integrityFiltered.add(obj.getPKey()) 

690 break 

691 if integrityFiltered: 

692 hasChanged = True 

693 for pkey in integrityFiltered: 

694 self.data[objtype].removeByPkey(pkey) 

695 __hermes__.logger.debug( 

696 f"Integrity constraints: filtered {len(integrityFiltered)}" 

697 f" item(s) from {objtype}" 

698 ) 

699 self.data[objtype].integrityFiltered |= integrityFiltered 

700 

701 elapsedms = int(round(1000 * (time.time() - starttime))) 

702 __hermes__.logger.debug(f"Integrity constraints enforced in {elapsedms} ms") 

703 

704 def commit_one(self, obj: DataObject): 

705 """Commit that specified 'obj' data changes have successfully sent to message 

706 bus""" 

707 for objtype, objcls in self.dataschema.objectTypes.items(): 

708 if isinstance(obj, objcls): 

709 cachedobj: DataObject | None = self.data.cache[objtype].get( 

710 obj.getPKey() 

711 ) 

712 cachedvalues = cachedobj.toNative() if cachedobj else {} 

713 fetchedvalues = obj.toNative() 

714 fragments = self._fragments[objtype] 

715 break 

716 else: 

717 raise TypeError( 

718 f"Specified object {repr(obj)} has invalid type '{type(obj)}'" 

719 ) 

720 

721 for fragment in fragments: 

722 fragment.commit_one(cachedvalues, fetchedvalues) 

723 

724 def commit_all(self, objtype: str): 

725 """Commit that all data fetched has successfully sent to message bus""" 

726 cachedvalues = self.data.cache[objtype].toNative() 

727 fetchedvalues = self.data[objtype].toNative() 

728 fragments = self._fragments[objtype] 

729 

730 for fragment in fragments: 

731 fragment.commit_all(cachedvalues, fetchedvalues)