Coverage for clients / errorqueue.py: 97%

337 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-03-16 15:10 +0000

1#!/usr/bin/env python3 

2# -*- coding: utf-8 -*- 

3 

4# Hermes : Change Data Capture (CDC) tool from any source(s) to any target 

5# Copyright (C) 2023 INSA Strasbourg 

6# 

7# This file is part of Hermes. 

8# 

9# Hermes is free software: you can redistribute it and/or modify 

10# it under the terms of the GNU General Public License as published by 

11# the Free Software Foundation, either version 3 of the License, or 

12# (at your option) any later version. 

13# 

14# Hermes is distributed in the hope that it will be useful, 

15# but WITHOUT ANY WARRANTY; without even the implied warranty of 

16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

17# GNU General Public License for more details. 

18# 

19# You should have received a copy of the GNU General Public License 

20# along with Hermes. If not, see <https://www.gnu.org/licenses/>. 

21 

22 

23from copy import deepcopy 

24from typing import Any, Iterable 

25 

26from lib.datamodel.event import Event 

27from lib.datamodel.dataobject import DataObject 

28from lib.datamodel.datasource import Datasource 

29from lib.datamodel.foreignkey import ForeignKey 

30from lib.datamodel.serialization import LocalCache 

31 

32 

33class HermesInvalidErrorQueueJSONError(Exception): 

34 """Raised when trying to import an ErrorQueue from json with invalid JSON data""" 

35 

36 

37class ErrorQueue(LocalCache): 

38 """Store and manage an indexed event queue. Useful for retrying Event in error""" 

39 

40 def __init__( 

41 self, 

42 typesMapping: dict[str, list[str]], 

43 remotedata: Datasource | None = None, 

44 remotedata_complete: Datasource | None = None, 

45 localdata: Datasource | None = None, 

46 localdata_complete: Datasource | None = None, 

47 from_json_dict: dict[str, Any] | None = None, 

48 autoremediate: str = "disabled", 

49 ): 

50 """Create an empty Event queue, or load it from specified 'from_json_dict'""" 

51 

52 if autoremediate in ("conservative", "maximum"): 

53 self._autoremediate: str | None = autoremediate 

54 """If set, indicate the policy to use for autoremediation""" 

55 else: 

56 self._autoremediate = None 

57 

58 self._queue: dict[int, tuple[Event | None, Event, str | None]] = {} 

59 """The event queue, key is a unique integer, value is a tuple with the 

60 remote event (or None if the entry was generated by a "pure" local event, 

61 eg. a change in the client datamodel), the local event, and a string 

62 containing an optional error message""" 

63 

64 self._index: dict[str, dict[Any, set[int]]] = {} 

65 """Index table of events. 

66 The keys are 

67 1. the local event object type (str) 

68 2. the event object primary key (Any) 

69 The value is a set containing all eventNumber in queue for the keys 

70 

71 self._index[localevent.objtype][localevent.objpkey] = 

72 set([eventNumber1, eventNumber2, ...]) 

73 """ 

74 

75 self._parentObjs: dict[str, dict[Any, set[int]]] = {} 

76 """Index of parent objects of objects in errors, with the eventNumber 

77 list that have stored them in this state. 

78 The keys are 

79 1. the local object type (str) 

80 2. the object primary key (Any) 

81 The value is a set containing all eventNumber in queue for the keys 

82 

83 self._parentObjs[objtype][objpkey] = set([eventNumber1, eventNumber2, ...]) 

84 """ 

85 

86 self._typesMapping = { 

87 "local": {}, 

88 "remote": {}, 

89 } 

90 for remoteobj, localobjs in typesMapping.items(): 

91 for localobj in localobjs: 

92 self._typesMapping["local"][localobj] = remoteobj 

93 self._typesMapping["remote"][remoteobj] = localobjs 

94 

95 """Mapping between local and remote objects types 

96 - self._typesMapping["local"]["local_type"] return the corresponding remote 

97 type 

98 - self._typesMapping["remote"]["remote_type"] return the corresponding 

99 local types 

100 """ 

101 

102 super().__init__(jsondataattr=["_queue"]) 

103 

104 self.updateDatasources( 

105 remotedata, remotedata_complete, localdata, localdata_complete 

106 ) 

107 

108 if from_json_dict: 

109 if from_json_dict.keys() != set(["_queue"]): 

110 raise HermesInvalidErrorQueueJSONError(f"{from_json_dict=}") 

111 else: 

112 # Prevent changes on deep references of from_json_dict 

113 from_json = deepcopy(from_json_dict) 

114 for eventNumber, ( 

115 remoteEventDict, 

116 localEventDict, 

117 errorMsg, 

118 ) in from_json["_queue"].items(): 

119 if remoteEventDict is None: 

120 remoteEvent = None 

121 else: 

122 remoteEvent = Event(from_json_dict=remoteEventDict) 

123 localEvent = Event(from_json_dict=localEventDict) 

124 self._append(remoteEvent, localEvent, errorMsg, int(eventNumber)) 

125 

126 def updateDatasources( 

127 self, 

128 remotedata: Datasource | None = None, 

129 remotedata_complete: Datasource | None = None, 

130 localdata: Datasource | None = None, 

131 localdata_complete: Datasource | None = None, 

132 ): 

133 """Update the references to the datasources used, required for a case in 

134 autoremediation""" 

135 self._remotedata: Datasource | None = remotedata 

136 self._remotedata_complete: Datasource | None = remotedata_complete 

137 self._localdata: Datasource | None = localdata 

138 self._localdata_complete: Datasource | None = localdata_complete 

139 

140 # Reset parentObjs and refill it 

141 self._parentObjs = {} 

142 for eventNumber in self._queue.keys(): 

143 self._addParentObjs(eventNumber) 

144 

145 def append( 

146 self, remoteEvent: Event | None, localEvent: Event | None, errorMsg: str | None 

147 ): 

148 """Append specified event to queue""" 

149 self._append( 

150 remoteEvent, localEvent, errorMsg, 1 + max(self._queue.keys(), default=0) 

151 ) 

152 

153 def _append( 

154 self, 

155 remoteEvent: Event | None, 

156 localEvent: Event | None, 

157 errorMsg: str | None, 

158 eventNumber: int, 

159 ): 

160 """Append specified event to queue, at specified eventNumber""" 

161 if eventNumber in self._queue: 

162 raise IndexError(f"Specified {eventNumber=} already exist in queue") 

163 

164 if ( 

165 remoteEvent is not None 

166 and remoteEvent.objtype not in self._typesMapping["remote"] 

167 ): 

168 __hermes__.logger.info( 

169 "Ignore loading of remote event of unknown objtype" 

170 f" {remoteEvent.objtype}" 

171 ) 

172 return 

173 

174 if localEvent.objtype not in self._typesMapping["local"]: 

175 __hermes__.logger.info( 

176 f"Ignore loading of local event of unknown objtype {localEvent.objtype}" 

177 ) 

178 return 

179 

180 self._queue[eventNumber] = (remoteEvent, localEvent, errorMsg) 

181 self._addEventToIndex(eventNumber) 

182 self._addParentObjs(eventNumber) 

183 

184 if self._autoremediate: 

185 self._remediateWithPrevious(eventNumber) 

186 

187 def _mergeEvents( 

188 self, 

189 prevEvent: Event | None, 

190 lastEvent: Event | None, 

191 datasource: Datasource | None, 

192 datasource_complete: Datasource | None, 

193 previousEvents: list[Event], 

194 ) -> tuple[bool, Event | None]: 

195 """Merge two events for remediation, and returns the result as a tuple with 

196 - wasMerged : a boolean indicating that the merge was done, meaning that the 

197 last event must be removed. If False, the values of removeBothEvents and 

198 newEvent in tuple must be ignored 

199 - removeBothEvents: a boolean indicating that the merge consists of both events 

200 removal. If True, the value of newEvent must be ignored 

201 - newEvent: the merged Event 

202 """ 

203 # Handle None values, that may only occurs for remote events 

204 if lastEvent is None and prevEvent is None: 

205 # No data : merging is easy 

206 __hermes__.logger.info("Merging two None events, result is None") 

207 return (True, False, None) 

208 elif lastEvent is None: 

209 # Keep prevEvent values 

210 __hermes__.logger.info( 

211 f"Merging {prevEvent.objattrs=} with lastEvent=None," 

212 f" result is {prevEvent=}" 

213 ) 

214 return (True, False, prevEvent) 

215 elif prevEvent is None: 

216 # Keep lastEvent values 

217 __hermes__.logger.info( 

218 f"Merging prevEvent=None with {lastEvent.objattrs=}," 

219 f" result is {lastEvent=}" 

220 ) 

221 return (True, False, lastEvent) 

222 

223 if ( 

224 (prevEvent.eventtype == "added" and lastEvent.eventtype == "added") 

225 or (prevEvent.eventtype == "removed" and lastEvent.eventtype == "modified") 

226 or (prevEvent.eventtype == "removed" and lastEvent.eventtype == "removed") 

227 or (prevEvent.eventtype == "modified" and lastEvent.eventtype == "added") 

228 ): 

229 errmsg = ( 

230 f"BUG : trying to merge a {lastEvent.eventtype} event with a" 

231 f" previous {prevEvent.eventtype} event, this should never happen." 

232 f" {lastEvent=} {prevEvent=}" 

233 ) 

234 __hermes__.logger.critical(errmsg) 

235 raise AssertionError(errmsg) 

236 

237 elif prevEvent.eventtype == "added" and lastEvent.eventtype == "modified": 

238 # Merge the modified event data into the added event 

239 mergedEvent = deepcopy(prevEvent) 

240 objattrs = mergedEvent.objattrs 

241 objattrs.update(lastEvent.objattrs.get("added", dict())) 

242 objattrs.update(lastEvent.objattrs.get("modified", dict())) 

243 for key in ( 

244 objattrs.keys() & lastEvent.objattrs.get("removed", dict()).keys() 

245 ): 

246 del objattrs[key] 

247 

248 __hermes__.logger.info( 

249 f"Merging added {prevEvent.objattrs=} with modified" 

250 f" {lastEvent.objattrs=}, result is added {objattrs=}" 

251 ) 

252 return (True, False, mergedEvent) 

253 elif prevEvent.eventtype == "added" and lastEvent.eventtype == "removed": 

254 if self._autoremediate == "maximum": 

255 # Remove the two events 

256 return (True, True, None) 

257 # Use "conservative" as fallback : don't merge the events 

258 return (False, False, None) 

259 elif prevEvent.eventtype == "removed" and lastEvent.eventtype == "added": 

260 if self._autoremediate == "maximum": 

261 # Ensure required data to process is available, otherwise fallback to 

262 # "conservative" policy 

263 if datasource is None: 

264 __hermes__.logger.info( 

265 f"Unable to merge removed {prevEvent=} with added " 

266 f"{lastEvent.objattrs=}, as no datasource is available. " 

267 "Fallback to 'conservative' mode." 

268 ) 

269 else: 

270 currentObj: DataObject | None = datasource.get( 

271 lastEvent.objtype, {} 

272 ).get(lastEvent.objpkey, None) 

273 

274 newObj: DataObject | None = datasource_complete.get( 

275 lastEvent.objtype, {} 

276 ).get(lastEvent.objpkey, None) 

277 

278 if len(previousEvents) != 0: 

279 # There are previous unprocesed events in error queue. 

280 # Merging is possible but first we have to apply previous 

281 # events changes to (a copy of) currentObj in order to 

282 # determine object's state before prevEvent 

283 

284 # Imported here to avoid circular dependency 

285 from .datamodel import Datamodel 

286 

287 currentObj = deepcopy(currentObj) 

288 ev: Event 

289 for ev in previousEvents: 

290 if ev is None: 

291 continue 

292 elif ev.eventtype == "added": 

293 currentObj = Datamodel.createDataobject( 

294 datasource.schema, ev.objtype, ev.objattrs 

295 ) 

296 elif ev.eventtype == "modified": 

297 if currentObj is None: 

298 # Should never occur 

299 errmsg = ( 

300 "BUG : unexpected object status met when trying" 

301 f" to merge two events {lastEvent=}" 

302 f" {lastEvent.eventtype=} ; {prevEvent=}" 

303 f" {prevEvent.eventtype=}" 

304 ) 

305 __hermes__.logger.critical(errmsg) 

306 raise AssertionError(errmsg) 

307 currentObj = Datamodel.getUpdatedObject( 

308 currentObj, ev.objattrs 

309 ) 

310 elif ev.eventtype == "removed": 

311 # Should never occur 

312 currentObj = None 

313 

314 if currentObj is None or newObj is None: 

315 __hermes__.logger.warning( 

316 f"BUG ? - Unable to merge removed {prevEvent=} with added " 

317 f"{lastEvent.objattrs=}, as related object was not found " 

318 f"in caches. {currentObj=} {newObj=}" 

319 ) 

320 else: 

321 # Diff the desired object with current one to generate a 

322 # modified event 

323 mergedEvent, _ = Event.fromDiffItem( 

324 newObj.diffFrom(currentObj), "base", "modified" 

325 ) 

326 

327 if ( 

328 mergedEvent.objattrs["added"] 

329 or mergedEvent.objattrs["modified"] 

330 or mergedEvent.objattrs["removed"] 

331 ): 

332 __hermes__.logger.info( 

333 f"Merging removed {prevEvent=} with added" 

334 f" {lastEvent.objattrs=}, result is modified" 

335 f" {mergedEvent=} {mergedEvent.objattrs=}" 

336 ) 

337 return (True, False, mergedEvent) 

338 else: 

339 __hermes__.logger.info( 

340 f"Merging removed {prevEvent=} with added " 

341 f"{lastEvent.objattrs=}, result is an empty modified" 

342 " event (without any change). Ignoring it" 

343 ) 

344 return (True, True, mergedEvent) 

345 

346 # Use "conservative" as fallback : don't merge the events 

347 return (False, False, None) 

348 elif prevEvent.eventtype == "modified" and lastEvent.eventtype == "modified": 

349 # Merge the two modified events 

350 mergedEvent = deepcopy(prevEvent) 

351 objattrs = mergedEvent.objattrs 

352 

353 lastAdded: dict[str, Any] = lastEvent.objattrs.get("added", dict()) 

354 lastModified: dict[str, Any] = lastEvent.objattrs.get("modified", dict()) 

355 lastRemoved: dict[str, Any] = lastEvent.objattrs.get("removed", dict()) 

356 

357 # Newly added attributes 

358 for attr, newval in lastAdded.items(): 

359 # # Newly added should not exist in prev modified 

360 # if attr in objattrs["modified"]: 

361 # raise AssertionError 

362 # Merge prev added with last added 

363 objattrs["added"][attr] = newval 

364 # As attr is now added, it should not be removed anymore 

365 if attr in objattrs["removed"]: 

366 del objattrs["removed"][attr] 

367 

368 # Newly modified attributes 

369 for attr, newval in lastModified.items(): 

370 # # Newly modified should not exist in prev removed 

371 # if attr in objattrs["removed"]: 

372 # raise AssertionError 

373 # If was added in prev, keep it as added, but update its value 

374 if attr in objattrs["added"]: 

375 objattrs["added"][attr] = newval 

376 else: 

377 # Keep or update the last modified value 

378 objattrs["modified"][attr] = newval 

379 

380 # Newly removed attributes 

381 for attr, newval in lastRemoved.items(): 

382 # # Newly removed should not exist in prev removed 

383 # if attr in objattrs["removed"]: 

384 # raise AssertionError 

385 if attr in objattrs["added"]: 

386 # Added + removed : nothing to keep 

387 del objattrs["added"][attr] 

388 else: 

389 if attr in objattrs["modified"]: 

390 # Modified + removed : don't keep the modified 

391 del objattrs["modified"][attr] 

392 # Keep the removed 

393 objattrs["removed"][attr] = newval 

394 

395 __hermes__.logger.info( 

396 f"Merging modified {prevEvent.objattrs=} with modified" 

397 f" {lastEvent.objattrs=}, result is modified {objattrs=}" 

398 ) 

399 return (True, False, mergedEvent) 

400 elif prevEvent.eventtype == "modified" and lastEvent.eventtype == "removed": 

401 if self._autoremediate == "maximum": 

402 # Remove prevEvent 

403 __hermes__.logger.info( 

404 f"Merging modified {prevEvent.objattrs=} with removed" 

405 f" {lastEvent.objattrs=}, result is removed {lastEvent=}" 

406 ) 

407 return (True, False, lastEvent) 

408 # Use "conservative" as fallback : don't merge the events 

409 return (False, False, None) 

410 else: 

411 errmsg = ( 

412 "BUG : unexpected eventtype met when trying to merge two events " 

413 f"{lastEvent=} {lastEvent.eventtype=}" 

414 f" ; {prevEvent=} {prevEvent.eventtype=}" 

415 ) 

416 __hermes__.logger.critical(errmsg) 

417 raise AssertionError(errmsg) 

418 

419 def _remediateWithPrevious(self, eventNumber: int): 

420 lastEventNumber = eventNumber 

421 lastRemoteEvent, lastLocalEvent, lastErrorMsg = self._queue[lastEventNumber] 

422 

423 allEventNumbers = sorted( 

424 self._index[lastLocalEvent.objtype][lastLocalEvent.objpkey] 

425 ) 

426 allEvents = [self._queue[evNum] for evNum in allEventNumbers] 

427 if len(allEvents) < 2: 

428 # No previous event to remediate with 

429 return 

430 

431 previousRemoteEvents = [i[0] for i in allEvents[:-2]] 

432 previousLocalEvents = [i[1] for i in allEvents[:-2]] 

433 

434 prevEventNumber = allEventNumbers[-2] 

435 prevRemoteEvent, prevLocalEvent, prevErrorMsg = allEvents[-2] 

436 

437 # Can't merge partially processed events 

438 if ( 

439 prevLocalEvent.isPartiallyProcessed 

440 or lastLocalEvent.isPartiallyProcessed 

441 or (prevRemoteEvent is not None and prevRemoteEvent.isPartiallyProcessed) 

442 or (lastRemoteEvent is not None and lastRemoteEvent.isPartiallyProcessed) 

443 ): 

444 stepsvalues = [] 

445 if prevRemoteEvent is not None: 

446 stepsvalues.append(f"{prevRemoteEvent.isPartiallyProcessed=}") 

447 else: 

448 stepsvalues.append(f"{prevRemoteEvent=}") 

449 stepsvalues.append(f"{prevLocalEvent.isPartiallyProcessed=}") 

450 if lastRemoteEvent is not None: 

451 stepsvalues.append(f"{lastRemoteEvent.isPartiallyProcessed=}") 

452 else: 

453 stepsvalues.append(f"{lastRemoteEvent=}") 

454 stepsvalues.append(f"{lastLocalEvent.isPartiallyProcessed=}") 

455 

456 __hermes__.logger.info( 

457 "Unable to merge two events of which at least one has already been" 

458 f" partially processed. {' '.join(stepsvalues)}" 

459 ) 

460 return 

461 

462 remotedWasMerged, remoteRemoveBothEvents, newRemoteEvent = self._mergeEvents( 

463 prevRemoteEvent, 

464 lastRemoteEvent, 

465 self._remotedata, 

466 self._remotedata_complete, 

467 previousRemoteEvents, 

468 ) 

469 localWasMerged, localRemoveBothEvents, newLocalEvent = self._mergeEvents( 

470 prevLocalEvent, 

471 lastLocalEvent, 

472 self._localdata, 

473 self._localdata_complete, 

474 previousLocalEvents, 

475 ) 

476 

477 if remotedWasMerged != localWasMerged: 

478 errmsg = ( 

479 "BUG : inconsistency between remote and local merge results :" 

480 f" {remotedWasMerged=}, {localWasMerged=}. {prevEventNumber=}," 

481 f" {lastEventNumber=} {prevRemoteEvent.toString(set())=}" 

482 f" {lastRemoteEvent.toString(set())=}," 

483 f" {prevLocalEvent.toString(set())=}, {lastLocalEvent.toString(set())=}" 

484 ) 

485 __hermes__.logger.critical(errmsg) 

486 raise AssertionError(errmsg) 

487 

488 if not localWasMerged: 

489 # No merge was done 

490 return 

491 

492 # Local processing result is the only one that really matters here 

493 if localRemoveBothEvents: 

494 self.remove(lastEventNumber) 

495 self.remove(prevEventNumber) 

496 else: 

497 # Last event was merged into previous, update previous and remove last from 

498 # queue 

499 self._queue[prevEventNumber] = (newRemoteEvent, newLocalEvent, prevErrorMsg) 

500 self.remove(lastEventNumber) 

501 

502 def _addEventToIndex(self, eventNumber: int): 

503 """Add specified event to index""" 

504 if eventNumber not in self._queue: 

505 raise IndexError(f"Specified {eventNumber=} doesn't exist in queue") 

506 

507 remoteEvent: Event | None 

508 localEvent: Event 

509 remoteEvent, localEvent, errorMsg = self._queue[eventNumber] 

510 

511 objtype = localEvent.objtype 

512 

513 # Create objtype sublevel if it doesn't exist yet 

514 if objtype not in self._index: 

515 self._index[objtype] = {} 

516 

517 if localEvent.objpkey not in self._index[objtype]: 

518 # Create the set with specified eventNumber 

519 self._index[objtype][localEvent.objpkey] = set([eventNumber]) 

520 else: 

521 # Add specified eventNumber to the set 

522 self._index[objtype][localEvent.objpkey].add(eventNumber) 

523 

524 def _addParentObjs(self, eventNumber: int): 

525 """Index parent objects of specified event""" 

526 if self._localdata is None: 

527 # Unable to proceed without a datasource 

528 return 

529 

530 if eventNumber not in self._queue: 

531 raise IndexError(f"Specified {eventNumber=} doesn't exist in queue") 

532 

533 remoteEvent: Event | None 

534 localEvent: Event 

535 remoteEvent, localEvent, errorMsg = self._queue[eventNumber] 

536 

537 fromobj: DataObject | None = self._localdata.get(localEvent.objtype, {}).get( 

538 localEvent.objpkey, None 

539 ) 

540 if fromobj is None: 

541 fromobj = self._localdata_complete.get(localEvent.objtype, {}).get( 

542 localEvent.objpkey, None 

543 ) 

544 if fromobj is None: 

545 return 

546 

547 parents = ForeignKey.fetchParentObjs(self._localdata, fromobj) 

548 

549 for parent in parents: 

550 objtype = parent.getType() 

551 # Create objtype sublevel if it doesn't exist yet 

552 if objtype not in self._parentObjs: 

553 self._parentObjs[objtype] = {} 

554 

555 if parent.getPKey() not in self._parentObjs[objtype]: 

556 # Create the set with specified eventNumber 

557 self._parentObjs[objtype][parent.getPKey()] = set([eventNumber]) 

558 else: 

559 # Add specified eventNumber to the set 

560 self._parentObjs[objtype][parent.getPKey()].add(eventNumber) 

561 

562 def updateErrorMsg(self, eventNumber: int, errorMsg: str): 

563 """Update errorMsg of specified eventNumber""" 

564 if eventNumber not in self._queue: 

565 raise IndexError(f"Specified {eventNumber=} doesn't exist in queue") 

566 

567 remoteEvent: Event | None 

568 localEvent: Event 

569 remoteEvent, localEvent, oldErrorMsg = self._queue[eventNumber] 

570 self._queue[eventNumber] = (remoteEvent, localEvent, errorMsg) 

571 

572 def remove(self, eventNumber: int, ignoreMissingEventNumber=False): 

573 """Remove event of specified eventNumber from queue""" 

574 if eventNumber not in self._queue: 

575 if ignoreMissingEventNumber: 

576 return 

577 else: 

578 raise IndexError(f"Specified {eventNumber=} doesn't exist in queue") 

579 

580 remoteEvent: Event | None 

581 localEvent: Event 

582 remoteEvent, localEvent, errorMsg = self._queue[eventNumber] 

583 

584 del self._queue[eventNumber] # Remove data from queue 

585 

586 objtype = localEvent.objtype 

587 

588 # Remove from index 

589 self._index[objtype][localEvent.objpkey].remove(eventNumber) 

590 

591 # Purge index uplevels when empty 

592 if not self._index[objtype][localEvent.objpkey]: 

593 del self._index[objtype][localEvent.objpkey] 

594 

595 if not self._index[objtype]: 

596 del self._index[objtype] 

597 

598 # Remove all references of specified eventNumber from _parentObjs 

599 for objtype in tuple(self._parentObjs.keys()): 

600 for objpkey in tuple(self._parentObjs[objtype].keys()): 

601 # Remove from parentObjs 

602 self._parentObjs[objtype][objpkey].discard(eventNumber) 

603 

604 # Purge index uplevels when empty 

605 if not self._parentObjs[objtype][objpkey]: 

606 del self._parentObjs[objtype][objpkey] 

607 

608 if not self._parentObjs[objtype]: 

609 del self._parentObjs[objtype] 

610 

611 def __iter__(self) -> Iterable: 

612 """Returns an iterator of current instance events to process 

613 It will ignore events about an object that still have older event in queue 

614 

615 Each entry will contains 4 values: 

616 1. eventNumber: int 

617 2. remoteEvent: remote event, or None if the entry was generated by a 

618 "pure" local event (eg. a change in the client datamodel) 

619 3. localEvent: local event 

620 4. errorMsg: str or None 

621 """ 

622 eventNumber: int 

623 remoteEvent: Event | None 

624 localEvent: Event 

625 errorMsg: str | None 

626 

627 for eventNumber in list(self._queue.keys()): 

628 # Event may have been removed during iteration 

629 # e.g. a call to purgeAllEventsOfDataObject() may remove several events 

630 if eventNumber not in self._queue: 

631 continue 

632 

633 remoteEvent, localEvent, errorMsg = self._queue[eventNumber] 

634 objindex = self._index[localEvent.objtype][localEvent.objpkey] 

635 

636 # If current event isn't the first of its object index, ignore it 

637 # because the previous must be processed before 

638 if eventNumber != min(objindex): 

639 continue 

640 

641 yield (eventNumber, remoteEvent, localEvent, errorMsg) 

642 

643 def allEvents(self) -> Iterable: 

644 """Returns an iterator of all current instance events 

645 

646 Each entry will contains 4 values: 

647 1. eventNumber: int 

648 2. remoteEvent: remote event, or None if the entry was generated by a 

649 "pure" local event (eg. a change in the client datamodel) 

650 3. localEvent: local event 

651 4. errorMsg: str or None 

652 """ 

653 eventNumber: int 

654 remoteEvent: Event | None 

655 localEvent: Event 

656 errorMsg: str | None 

657 

658 for eventNumber in list(self._queue.keys()): 

659 # Event may have been removed during iteration 

660 # e.g. a call to purgeAllEventsOfDataObject() may remove several events 

661 if eventNumber not in self._queue: 

662 continue 

663 

664 remoteEvent, localEvent, errorMsg = self._queue[eventNumber] 

665 yield (eventNumber, remoteEvent, localEvent, errorMsg) 

666 

667 def __len__(self) -> int: 

668 """Returns the number of Event in queue""" 

669 return len(self._queue) 

670 

671 def keys(self) -> set[int]: 

672 """Returns a set with every event number in current instance""" 

673 return set(self._queue.keys()) 

674 

675 def _getLocalObjtype(self, objtype: str, isLocalObjtype: bool) -> list[str] | None: 

676 """Returns the local objtype, or None if it doesn't exist""" 

677 if isLocalObjtype: 

678 if objtype in self._typesMapping["local"]: 

679 return [objtype] 

680 else: 

681 return None 

682 else: 

683 return self._typesMapping["remote"].get(objtype) 

684 

685 def containsObject(self, objtype: str, objpkey: Any, isLocalObjtype: bool) -> bool: 

686 """Indicate if object of specified objpkey of specified objtype exists in 

687 current instance""" 

688 l_objtypes = self._getLocalObjtype(objtype, isLocalObjtype) 

689 if l_objtypes is None: 

690 return False 

691 for l_objtype in l_objtypes: 

692 if l_objtype in self._index and objpkey in self._index[l_objtype]: 

693 return True 

694 return False 

695 

696 def containsObjectByEvent(self, event: Event, isLocalEvent: bool) -> bool: 

697 """Indicate if object of specified event exists in current instance""" 

698 return self.containsObject(event.objtype, event.objpkey, isLocalEvent) 

699 

700 def isEventAParentOfAnotherError(self, event: Event, isLocalEvent: bool) -> bool: 

701 """Indicate if object of specified event is a parent (a foreign key) of 

702 an object that exists in current instance""" 

703 l_objtypes = self._getLocalObjtype(event.objtype, isLocalEvent) 

704 if l_objtypes is None: 

705 return False 

706 for l_objtype in l_objtypes: 

707 parentEvs = self._parentObjs.get(l_objtype, {}).get(event.objpkey) 

708 if parentEvs is not None: 

709 return True 

710 return False 

711 

712 def purgeAllEvents(self, objtype: str, objpkey: Any, isLocalObjtype: bool): 

713 """Delete all events of specified objpkey of specified objtype from current 

714 instance""" 

715 if not self.containsObject(objtype, objpkey, isLocalObjtype): 

716 return 

717 

718 l_objtypes = self._getLocalObjtype(objtype, isLocalObjtype) 

719 if l_objtypes is None: 

720 return 

721 

722 for l_objtype in l_objtypes: 

723 eventNumbers = self._index[l_objtype][objpkey] 

724 

725 # Loop over a copy as content may be removed during iteration 

726 for eventNumber in eventNumbers.copy(): 

727 self.remove(eventNumber) 

728 

729 def purgeAllEventsOfDataObject(self, obj: DataObject, isLocalObjtype: bool): 

730 """Delete all events of specified objpkey of specified objtype from current 

731 instance""" 

732 self.purgeAllEvents(obj.getType(), obj.getPKey(), isLocalObjtype) 

733 

734 def updatePrimaryKeys( 

735 self, 

736 new_remote_pkeys: dict[str, str], 

737 remote_data: Datasource, 

738 remote_data_complete: Datasource, 

739 new_local_pkeys: dict[str, str], 

740 local_data: Datasource, 

741 local_data_complete: Datasource, 

742 ): 

743 """Will update primary keys. new_remote_pkeys is a dict with remote objtype as 

744 key, and the new remote primary key attribute name as value. new_local_pkeys is 

745 a dict with local objtype as key, and the new local primary key attribute name 

746 as value. 

747 The specified datasources must not have their primary keys updated yet, to 

748 allow conversion. 

749 

750 The ErrorQueue MUST immediately be saved and re-instantiated from cache by 

751 caller to reflect data changes. 

752 """ 

753 newqueue = {} 

754 for eventNumber, (remoteEvent, localEvent, errorMsg) in self._queue.items(): 

755 # Remote event, if any 

756 if remoteEvent is None: 

757 newRemoteEvent = None 

758 else: 

759 if remoteEvent.objtype not in new_remote_pkeys.keys(): 

760 # Objtype of remote event has no pkey update 

761 newRemoteEvent = remoteEvent 

762 else: 

763 oldobj = remote_data[remoteEvent.objtype].get(remoteEvent.objpkey) 

764 if oldobj is None: 

765 oldobj = remote_data_complete[remoteEvent.objtype][ 

766 remoteEvent.objpkey 

767 ] 

768 if type(new_remote_pkeys[remoteEvent.objtype]) is tuple: 

769 # New pkey is a tuple, loop over each attr 

770 newpkey = [] 

771 for pkattr in new_remote_pkeys[remoteEvent.objtype]: 

772 newpkey.append(getattr(oldobj, pkattr)) 

773 newpkey = tuple(newpkey) 

774 else: 

775 newpkey = getattr(oldobj, new_remote_pkeys[remoteEvent.objtype]) 

776 

777 # Save modified remote event 

778 newRemoteEvent = deepcopy(remoteEvent) 

779 newRemoteEvent.objpkey = newpkey 

780 

781 # Local event 

782 if localEvent.objtype not in new_local_pkeys.keys(): 

783 # Objtype of local event has no pkey update 

784 newLocalEvent = localEvent 

785 else: 

786 oldobj = local_data[localEvent.objtype].get(localEvent.objpkey) 

787 if oldobj is None: 

788 oldobj = local_data_complete[localEvent.objtype][localEvent.objpkey] 

789 if type(new_local_pkeys[localEvent.objtype]) is tuple: 

790 # New pkey is a tuple, loop over each attr 

791 newpkey = [] 

792 for pkattr in new_local_pkeys[localEvent.objtype]: 

793 newpkey.append(getattr(oldobj, pkattr)) 

794 newpkey = tuple(newpkey) 

795 else: 

796 newpkey = getattr(oldobj, new_local_pkeys[localEvent.objtype]) 

797 

798 # Save modified local event 

799 newLocalEvent = deepcopy(localEvent) 

800 newLocalEvent.objpkey = newpkey 

801 

802 # Update reserved _pkey* attributes in added events 

803 if newLocalEvent.eventtype == "added": 

804 # Remove previous pkey attributes 

805 for attr in list(newLocalEvent.objattrs.keys()): 

806 if attr.startswith("_pkey_"): 

807 del newLocalEvent.objattrs[attr] 

808 # Add new pkey attributes 

809 if type(new_local_pkeys[localEvent.objtype]) is tuple: 

810 for i in range(len(newpkey)): 

811 newLocalEvent.objattrs[ 

812 new_local_pkeys[localEvent.objtype][i] 

813 ] = newpkey[i] 

814 else: 

815 newLocalEvent.objattrs[new_local_pkeys[localEvent.objtype]] = ( 

816 newpkey 

817 ) 

818 

819 # Save modified entry 

820 newqueue[eventNumber] = (newRemoteEvent, newLocalEvent, errorMsg) 

821 

822 self._queue = newqueue