Coverage for clients/errorqueue.py: 97%

326 statements  

« prev     ^ index     » next       coverage.py v7.10.1, created at 2025-07-28 07:25 +0000

1#!/usr/bin/env python3 

2# -*- coding: utf-8 -*- 

3 

4# Hermes : Change Data Capture (CDC) tool from any source(s) to any target 

5# Copyright (C) 2023, 2024 INSA Strasbourg 

6# 

7# This file is part of Hermes. 

8# 

9# Hermes is free software: you can redistribute it and/or modify 

10# it under the terms of the GNU General Public License as published by 

11# the Free Software Foundation, either version 3 of the License, or 

12# (at your option) any later version. 

13# 

14# Hermes is distributed in the hope that it will be useful, 

15# but WITHOUT ANY WARRANTY; without even the implied warranty of 

16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

17# GNU General Public License for more details. 

18# 

19# You should have received a copy of the GNU General Public License 

20# along with Hermes. If not, see <https://www.gnu.org/licenses/>. 

21 

22 

23from copy import deepcopy 

24from typing import Any, Iterable 

25 

26from lib.datamodel.event import Event 

27from lib.datamodel.dataobject import DataObject 

28from lib.datamodel.datasource import Datasource 

29from lib.datamodel.foreignkey import ForeignKey 

30from lib.datamodel.serialization import LocalCache 

31 

32 

33class HermesInvalidErrorQueueJSONError(Exception): 

34 """Raised when trying to import an ErrorQueue from json with invalid JSON data""" 

35 

36 

37class ErrorQueue(LocalCache): 

38 """Store and manage an indexed event queue. Useful for retrying Event in error""" 

39 

40 def __init__( 

41 self, 

42 typesMapping: dict[str, str], 

43 remotedata: Datasource | None = None, 

44 remotedata_complete: Datasource | None = None, 

45 localdata: Datasource | None = None, 

46 localdata_complete: Datasource | None = None, 

47 from_json_dict: dict[str, Any] | None = None, 

48 autoremediate: str = "disabled", 

49 ): 

50 """Create an empty Event queue, or load it from specified 'from_json_dict'""" 

51 

52 if autoremediate in ("conservative", "maximum"): 

53 self._autoremediate: str | None = autoremediate 

54 """If set, indicate the policy to use for autoremediation""" 

55 else: 

56 self._autoremediate = None 

57 

58 self._queue: dict[int, tuple[Event | None, Event, str | None]] = {} 

59 """The event queue, key is a unique integer, value is a tuple with the 

60 remote event (or None if the entry was generated by a "pure" local event, 

61 eg. a change in the client datamodel), the local event, and a string 

62 containing an optional error message""" 

63 

64 self._index: dict[str, dict[Any, set[int]]] = {} 

65 """Index table of events. 

66 The keys are 

67 1. the local event object type (str) 

68 2. the event object primary key (Any) 

69 The value is a set containing all eventNumber in queue for the keys 

70 

71 self._index[localevent.objtype][localevent.objpkey] = 

72 set([eventNumber1, eventNumber2, ...]) 

73 """ 

74 

75 self._parentObjs: dict[str, dict[Any, set[int]]] = {} 

76 """Index of parent objects of objects in errors, with the eventNumber 

77 list that have stored them in this state. 

78 The keys are 

79 1. the local object type (str) 

80 2. the object primary key (Any) 

81 The value is a set containing all eventNumber in queue for the keys 

82 

83 self._parentObjs[objtype][objpkey] = set([eventNumber1, eventNumber2, ...]) 

84 """ 

85 

86 self._typesMapping = { 

87 "local": {v: k for k, v in typesMapping.items()}, 

88 "remote": {k: v for k, v in typesMapping.items()}, 

89 } 

90 """Mapping between local and remote objects types 

91 - self._typesMapping["local"]["local_type"] return the corresponding remote 

92 type 

93 - self._typesMapping["remote"]["remote_type"] return the corresponding 

94 local type 

95 """ 

96 

97 super().__init__(jsondataattr=["_queue"]) 

98 

99 self.updateDatasources( 

100 remotedata, remotedata_complete, localdata, localdata_complete 

101 ) 

102 

103 if from_json_dict: 

104 if from_json_dict.keys() != set(["_queue"]): 

105 raise HermesInvalidErrorQueueJSONError(f"{from_json_dict=}") 

106 else: 

107 # Prevent changes on deep references of from_json_dict 

108 from_json = deepcopy(from_json_dict) 

109 for eventNumber, ( 

110 remoteEventDict, 

111 localEventDict, 

112 errorMsg, 

113 ) in from_json["_queue"].items(): 

114 if remoteEventDict is None: 

115 remoteEvent = None 

116 else: 

117 remoteEvent = Event(from_json_dict=remoteEventDict) 

118 localEvent = Event(from_json_dict=localEventDict) 

119 self._append(remoteEvent, localEvent, errorMsg, int(eventNumber)) 

120 

121 def updateDatasources( 

122 self, 

123 remotedata: Datasource | None = None, 

124 remotedata_complete: Datasource | None = None, 

125 localdata: Datasource | None = None, 

126 localdata_complete: Datasource | None = None, 

127 ): 

128 """Update the references to the datasources used, required for a case in 

129 autoremediation""" 

130 self._remotedata: Datasource | None = remotedata 

131 self._remotedata_complete: Datasource | None = remotedata_complete 

132 self._localdata: Datasource | None = localdata 

133 self._localdata_complete: Datasource | None = localdata_complete 

134 

135 # Reset parentObjs and refill it 

136 self._parentObjs = {} 

137 for eventNumber in self._queue.keys(): 

138 self._addParentObjs(eventNumber) 

139 

140 def append( 

141 self, remoteEvent: Event | None, localEvent: Event | None, errorMsg: str | None 

142 ): 

143 """Append specified event to queue""" 

144 self._append( 

145 remoteEvent, localEvent, errorMsg, 1 + max(self._queue.keys(), default=0) 

146 ) 

147 

148 def _append( 

149 self, 

150 remoteEvent: Event | None, 

151 localEvent: Event | None, 

152 errorMsg: str | None, 

153 eventNumber: int, 

154 ): 

155 """Append specified event to queue, at specified eventNumber""" 

156 if eventNumber in self._queue: 

157 raise IndexError(f"Specified {eventNumber=} already exist in queue") 

158 

159 if ( 

160 remoteEvent is not None 

161 and remoteEvent.objtype not in self._typesMapping["remote"] 

162 ): 

163 __hermes__.logger.info( 

164 "Ignore loading of remote event of unknown objtype" 

165 f" {remoteEvent.objtype}" 

166 ) 

167 return 

168 

169 if localEvent.objtype not in self._typesMapping["local"]: 

170 __hermes__.logger.info( 

171 f"Ignore loading of local event of unknown objtype {localEvent.objtype}" 

172 ) 

173 return 

174 

175 self._queue[eventNumber] = (remoteEvent, localEvent, errorMsg) 

176 self._addEventToIndex(eventNumber) 

177 self._addParentObjs(eventNumber) 

178 

179 if self._autoremediate: 

180 self._remediateWithPrevious(eventNumber) 

181 

182 def _mergeEvents( 

183 self, 

184 prevEvent: Event | None, 

185 lastEvent: Event | None, 

186 datasource: Datasource | None, 

187 datasource_complete: Datasource | None, 

188 previousEvents: list[Event], 

189 ) -> tuple[bool, Event | None]: 

190 """Merge two events for remediation, and returns the result as a tuple with 

191 - wasMerged : a boolean indicating that the merge was done, meaning that the 

192 last event must be removed. If False, the values of removeBothEvents and 

193 newEvent in tuple must be ignored 

194 - removeBothEvents: a boolean indicating that the merge consists of both events 

195 removal. If True, the value of newEvent must be ignored 

196 - newEvent: the merged Event 

197 """ 

198 # Handle None values, that may only occurs for remote events 

199 if lastEvent is None and prevEvent is None: 

200 # No data : merging is easy 

201 __hermes__.logger.info("Merging two None events, result is None") 

202 return (True, False, None) 

203 elif lastEvent is None: 

204 # Keep prevEvent values 

205 __hermes__.logger.info( 

206 f"Merging {prevEvent.objattrs=} with lastEvent=None," 

207 f" result is {prevEvent=}" 

208 ) 

209 return (True, False, prevEvent) 

210 elif prevEvent is None: 

211 # Keep lastEvent values 

212 __hermes__.logger.info( 

213 f"Merging prevEvent=None with {lastEvent.objattrs=}," 

214 f" result is {lastEvent=}" 

215 ) 

216 return (True, False, lastEvent) 

217 

218 if ( 

219 (prevEvent.eventtype == "added" and lastEvent.eventtype == "added") 

220 or (prevEvent.eventtype == "removed" and lastEvent.eventtype == "modified") 

221 or (prevEvent.eventtype == "removed" and lastEvent.eventtype == "removed") 

222 or (prevEvent.eventtype == "modified" and lastEvent.eventtype == "added") 

223 ): 

224 errmsg = ( 

225 f"BUG : trying to merge a {lastEvent.eventtype} event with a" 

226 f" previous {prevEvent.eventtype} event, this should never happen." 

227 f" {lastEvent=} {prevEvent=}" 

228 ) 

229 __hermes__.logger.critical(errmsg) 

230 raise AssertionError(errmsg) 

231 

232 elif prevEvent.eventtype == "added" and lastEvent.eventtype == "modified": 

233 # Merge the modified event data into the added event 

234 mergedEvent = deepcopy(prevEvent) 

235 objattrs = mergedEvent.objattrs 

236 objattrs.update(lastEvent.objattrs.get("added", dict())) 

237 objattrs.update(lastEvent.objattrs.get("modified", dict())) 

238 for key in ( 

239 objattrs.keys() & lastEvent.objattrs.get("removed", dict()).keys() 

240 ): 

241 del objattrs[key] 

242 

243 __hermes__.logger.info( 

244 f"Merging added {prevEvent.objattrs=} with modified" 

245 f" {lastEvent.objattrs=}, result is added {objattrs=}" 

246 ) 

247 return (True, False, mergedEvent) 

248 elif prevEvent.eventtype == "added" and lastEvent.eventtype == "removed": 

249 if self._autoremediate == "maximum": 

250 # Remove the two events 

251 return (True, True, None) 

252 # Use "conservative" as fallback : don't merge the events 

253 return (False, False, None) 

254 elif prevEvent.eventtype == "removed" and lastEvent.eventtype == "added": 

255 if self._autoremediate == "maximum": 

256 # Ensure required data to process is available, otherwise fallback to 

257 # "conservative" policy 

258 if datasource is None: 

259 __hermes__.logger.info( 

260 f"Unable to merge removed {prevEvent=} with added " 

261 f"{lastEvent.objattrs=}, as no datasource is available. " 

262 "Fallback to 'conservative' mode." 

263 ) 

264 else: 

265 currentObj: DataObject | None = datasource.get( 

266 lastEvent.objtype, {} 

267 ).get(lastEvent.objpkey, None) 

268 

269 newObj: DataObject | None = datasource_complete.get( 

270 lastEvent.objtype, {} 

271 ).get(lastEvent.objpkey, None) 

272 

273 if len(previousEvents) != 0: 

274 # There are previous unprocesed events in error queue. 

275 # Merging is possible but first we have to apply previous 

276 # events changes to (a copy of) currentObj in order to 

277 # determine object's state before prevEvent 

278 

279 # Imported here to avoid circular dependency 

280 from .datamodel import Datamodel 

281 

282 currentObj = deepcopy(currentObj) 

283 ev: Event 

284 for ev in previousEvents: 

285 if ev is None: 

286 continue 

287 elif ev.eventtype == "added": 

288 currentObj = Datamodel.createDataobject( 

289 datasource.schema, ev.objtype, ev.objattrs 

290 ) 

291 elif ev.eventtype == "modified": 

292 if currentObj is None: 

293 # Should never occur 

294 errmsg = ( 

295 "BUG : unexpected object status met when trying" 

296 f" to merge two events {lastEvent=}" 

297 f" {lastEvent.eventtype=} ; {prevEvent=}" 

298 f" {prevEvent.eventtype=}" 

299 ) 

300 __hermes__.logger.critical(errmsg) 

301 raise AssertionError(errmsg) 

302 currentObj = Datamodel.getUpdatedObject( 

303 currentObj, ev.objattrs 

304 ) 

305 elif ev.eventtype == "removed": 

306 # Should never occur 

307 currentObj = None 

308 

309 if currentObj is None or newObj is None: 

310 __hermes__.logger.warning( 

311 f"BUG ? - Unable to merge removed {prevEvent=} with added " 

312 f"{lastEvent.objattrs=}, as related object was not found " 

313 f"in caches. {currentObj=} {newObj=}" 

314 ) 

315 else: 

316 # Diff the desired object with current one to generate a 

317 # modified event 

318 mergedEvent, _ = Event.fromDiffItem( 

319 newObj.diffFrom(currentObj), "base", "modified" 

320 ) 

321 

322 if ( 

323 mergedEvent.objattrs["added"] 

324 or mergedEvent.objattrs["modified"] 

325 or mergedEvent.objattrs["removed"] 

326 ): 

327 __hermes__.logger.info( 

328 f"Merging removed {prevEvent=} with added" 

329 f" {lastEvent.objattrs=}, result is modified" 

330 f" {mergedEvent=} {mergedEvent.objattrs=}" 

331 ) 

332 return (True, False, mergedEvent) 

333 else: 

334 __hermes__.logger.info( 

335 f"Merging removed {prevEvent=} with added " 

336 f"{lastEvent.objattrs=}, result is an empty modified" 

337 " event (without any change). Ignoring it" 

338 ) 

339 return (True, True, mergedEvent) 

340 

341 # Use "conservative" as fallback : don't merge the events 

342 return (False, False, None) 

343 elif prevEvent.eventtype == "modified" and lastEvent.eventtype == "modified": 

344 # Merge the two modified events 

345 mergedEvent = deepcopy(prevEvent) 

346 objattrs = mergedEvent.objattrs 

347 

348 lastAdded: dict[str, Any] = lastEvent.objattrs.get("added", dict()) 

349 lastModified: dict[str, Any] = lastEvent.objattrs.get("modified", dict()) 

350 lastRemoved: dict[str, Any] = lastEvent.objattrs.get("removed", dict()) 

351 

352 # Newly added attributes 

353 for attr, newval in lastAdded.items(): 

354 # # Newly added should not exist in prev modified 

355 # if attr in objattrs["modified"]: 

356 # raise AssertionError 

357 # Merge prev added with last added 

358 objattrs["added"][attr] = newval 

359 # As attr is now added, it should not be removed anymore 

360 if attr in objattrs["removed"]: 

361 del objattrs["removed"][attr] 

362 

363 # Newly modified attributes 

364 for attr, newval in lastModified.items(): 

365 # # Newly modified should not exist in prev removed 

366 # if attr in objattrs["removed"]: 

367 # raise AssertionError 

368 # If was added in prev, keep it as added, but update its value 

369 if attr in objattrs["added"]: 

370 objattrs["added"][attr] = newval 

371 else: 

372 # Keep or update the last modified value 

373 objattrs["modified"][attr] = newval 

374 

375 # Newly removed attributes 

376 for attr, newval in lastRemoved.items(): 

377 # # Newly removed should not exist in prev removed 

378 # if attr in objattrs["removed"]: 

379 # raise AssertionError 

380 if attr in objattrs["added"]: 

381 # Added + removed : nothing to keep 

382 del objattrs["added"][attr] 

383 else: 

384 if attr in objattrs["modified"]: 

385 # Modified + removed : don't keep the modified 

386 del objattrs["modified"][attr] 

387 # Keep the removed 

388 objattrs["removed"][attr] = newval 

389 

390 __hermes__.logger.info( 

391 f"Merging modified {prevEvent.objattrs=} with modified" 

392 f" {lastEvent.objattrs=}, result is modified {objattrs=}" 

393 ) 

394 return (True, False, mergedEvent) 

395 elif prevEvent.eventtype == "modified" and lastEvent.eventtype == "removed": 

396 if self._autoremediate == "maximum": 

397 # Remove prevEvent 

398 __hermes__.logger.info( 

399 f"Merging modified {prevEvent.objattrs=} with removed" 

400 f" {lastEvent.objattrs=}, result is removed {lastEvent=}" 

401 ) 

402 return (True, False, lastEvent) 

403 # Use "conservative" as fallback : don't merge the events 

404 return (False, False, None) 

405 else: 

406 errmsg = ( 

407 "BUG : unexpected eventtype met when trying to merge two events " 

408 f"{lastEvent=} {lastEvent.eventtype=}" 

409 f" ; {prevEvent=} {prevEvent.eventtype=}" 

410 ) 

411 __hermes__.logger.critical(errmsg) 

412 raise AssertionError(errmsg) 

413 

414 def _remediateWithPrevious(self, eventNumber: int): 

415 lastEventNumber = eventNumber 

416 (lastRemoteEvent, lastLocalEvent, lastErrorMsg) = self._queue[lastEventNumber] 

417 

418 allEventNumbers = sorted( 

419 self._index[lastLocalEvent.objtype][lastLocalEvent.objpkey] 

420 ) 

421 allEvents = [self._queue[evNum] for evNum in allEventNumbers] 

422 if len(allEvents) < 2: 

423 # No previous event to remediate with 

424 return 

425 

426 previousRemoteEvents = [i[0] for i in allEvents[:-2]] 

427 previousLocalEvents = [i[1] for i in allEvents[:-2]] 

428 

429 prevEventNumber = allEventNumbers[-2] 

430 (prevRemoteEvent, prevLocalEvent, prevErrorMsg) = allEvents[-2] 

431 

432 # Can't merge partially processed events 

433 if ( 

434 prevLocalEvent.isPartiallyProcessed 

435 or lastLocalEvent.isPartiallyProcessed 

436 or (prevRemoteEvent is not None and prevRemoteEvent.isPartiallyProcessed) 

437 or (lastRemoteEvent is not None and lastRemoteEvent.isPartiallyProcessed) 

438 ): 

439 stepsvalues = [] 

440 if prevRemoteEvent is not None: 

441 stepsvalues.append(f"{prevRemoteEvent.isPartiallyProcessed=}") 

442 else: 

443 stepsvalues.append(f"{prevRemoteEvent=}") 

444 stepsvalues.append(f"{prevLocalEvent.isPartiallyProcessed=}") 

445 if lastRemoteEvent is not None: 

446 stepsvalues.append(f"{lastRemoteEvent.isPartiallyProcessed=}") 

447 else: 

448 stepsvalues.append(f"{lastRemoteEvent=}") 

449 stepsvalues.append(f"{lastLocalEvent.isPartiallyProcessed=}") 

450 

451 __hermes__.logger.info( 

452 "Unable to merge two events of which at least one has already been" 

453 f" partially processed. {' '.join(stepsvalues)}" 

454 ) 

455 return 

456 

457 (remotedWasMerged, remoteRemoveBothEvents, newRemoteEvent) = self._mergeEvents( 

458 prevRemoteEvent, 

459 lastRemoteEvent, 

460 self._remotedata, 

461 self._remotedata_complete, 

462 previousRemoteEvents, 

463 ) 

464 (localWasMerged, localRemoveBothEvents, newLocalEvent) = self._mergeEvents( 

465 prevLocalEvent, 

466 lastLocalEvent, 

467 self._localdata, 

468 self._localdata_complete, 

469 previousLocalEvents, 

470 ) 

471 

472 if remotedWasMerged != localWasMerged: 

473 errmsg = ( 

474 "BUG : inconsistency between remote and local merge results :" 

475 f" {remotedWasMerged=}, {localWasMerged=}. {prevEventNumber=}," 

476 f" {lastEventNumber=} {prevRemoteEvent.toString(set())=}" 

477 f" {lastRemoteEvent.toString(set())=}," 

478 f" {prevLocalEvent.toString(set())=}, {lastLocalEvent.toString(set())=}" 

479 ) 

480 __hermes__.logger.critical(errmsg) 

481 raise AssertionError(errmsg) 

482 

483 if not localWasMerged: 

484 # No merge was done 

485 return 

486 

487 # Local processing result is the only one that really matters here 

488 if localRemoveBothEvents: 

489 self.remove(lastEventNumber) 

490 self.remove(prevEventNumber) 

491 else: 

492 # Last event was merged into previous, update previous and remove last from 

493 # queue 

494 self._queue[prevEventNumber] = (newRemoteEvent, newLocalEvent, prevErrorMsg) 

495 self.remove(lastEventNumber) 

496 

497 def _addEventToIndex(self, eventNumber: int): 

498 """Add specified event to index""" 

499 if eventNumber not in self._queue: 

500 raise IndexError(f"Specified {eventNumber=} doesn't exist in queue") 

501 

502 remoteEvent: Event | None 

503 localEvent: Event 

504 remoteEvent, localEvent, errorMsg = self._queue[eventNumber] 

505 

506 objtype = localEvent.objtype 

507 

508 # Create objtype sublevel if it doesn't exist yet 

509 if objtype not in self._index: 

510 self._index[objtype] = {} 

511 

512 if localEvent.objpkey not in self._index[objtype]: 

513 # Create the set with specified eventNumber 

514 self._index[objtype][localEvent.objpkey] = set([eventNumber]) 

515 else: 

516 # Add specified eventNumber to the set 

517 self._index[objtype][localEvent.objpkey].add(eventNumber) 

518 

519 def _addParentObjs(self, eventNumber: int): 

520 """Index parent objects of specified event""" 

521 if self._localdata is None: 

522 # Unable to proceed without a datasource 

523 return 

524 

525 if eventNumber not in self._queue: 

526 raise IndexError(f"Specified {eventNumber=} doesn't exist in queue") 

527 

528 remoteEvent: Event | None 

529 localEvent: Event 

530 remoteEvent, localEvent, errorMsg = self._queue[eventNumber] 

531 

532 fromobj: DataObject | None = self._localdata.get(localEvent.objtype, {}).get( 

533 localEvent.objpkey, None 

534 ) 

535 if fromobj is None: 

536 fromobj = self._localdata_complete.get(localEvent.objtype, {}).get( 

537 localEvent.objpkey, None 

538 ) 

539 if fromobj is None: 

540 return 

541 

542 parents = ForeignKey.fetchParentObjs(self._localdata, fromobj) 

543 

544 for parent in parents: 

545 objtype = parent.getType() 

546 # Create objtype sublevel if it doesn't exist yet 

547 if objtype not in self._parentObjs: 

548 self._parentObjs[objtype] = {} 

549 

550 if parent.getPKey() not in self._parentObjs[objtype]: 

551 # Create the set with specified eventNumber 

552 self._parentObjs[objtype][parent.getPKey()] = set([eventNumber]) 

553 else: 

554 # Add specified eventNumber to the set 

555 self._parentObjs[objtype][parent.getPKey()].add(eventNumber) 

556 

557 def updateErrorMsg(self, eventNumber: int, errorMsg: str): 

558 """Update errorMsg of specified eventNumber""" 

559 if eventNumber not in self._queue: 

560 raise IndexError(f"Specified {eventNumber=} doesn't exist in queue") 

561 

562 remoteEvent: Event | None 

563 localEvent: Event 

564 remoteEvent, localEvent, oldErrorMsg = self._queue[eventNumber] 

565 self._queue[eventNumber] = (remoteEvent, localEvent, errorMsg) 

566 

567 def remove(self, eventNumber: int, ignoreMissingEventNumber=False): 

568 """Remove event of specified eventNumber from queue""" 

569 if eventNumber not in self._queue: 

570 if ignoreMissingEventNumber: 

571 return 

572 else: 

573 raise IndexError(f"Specified {eventNumber=} doesn't exist in queue") 

574 

575 remoteEvent: Event | None 

576 localEvent: Event 

577 remoteEvent, localEvent, errorMsg = self._queue[eventNumber] 

578 

579 del self._queue[eventNumber] # Remove data from queue 

580 

581 objtype = localEvent.objtype 

582 

583 # Remove from index 

584 self._index[objtype][localEvent.objpkey].remove(eventNumber) 

585 

586 # Purge index uplevels when empty 

587 if not self._index[objtype][localEvent.objpkey]: 

588 del self._index[objtype][localEvent.objpkey] 

589 

590 if not self._index[objtype]: 

591 del self._index[objtype] 

592 

593 # Remove all references of specified eventNumber from _parentObjs 

594 for objtype in tuple(self._parentObjs.keys()): 

595 for objpkey in tuple(self._parentObjs[objtype].keys()): 

596 # Remove from parentObjs 

597 self._parentObjs[objtype][objpkey].discard(eventNumber) 

598 

599 # Purge index uplevels when empty 

600 if not self._parentObjs[objtype][objpkey]: 

601 del self._parentObjs[objtype][objpkey] 

602 

603 if not self._parentObjs[objtype]: 

604 del self._parentObjs[objtype] 

605 

606 def __iter__(self) -> Iterable: 

607 """Returns an iterator of current instance events to process 

608 It will ignore events about an object that still have older event in queue 

609 

610 Each entry will contains 4 values: 

611 1. eventNumber: int 

612 2. remoteEvent: remote event, or None if the entry was generated by a 

613 "pure" local event (eg. a change in the client datamodel) 

614 3. localEvent: local event 

615 4. errorMsg: str or None 

616 """ 

617 eventNumber: int 

618 remoteEvent: Event | None 

619 localEvent: Event 

620 errorMsg: str | None 

621 

622 for eventNumber in list(self._queue.keys()): 

623 # Event may have been removed during iteration 

624 # e.g. a call to purgeAllEventsOfDataObject() may remove several events 

625 if eventNumber not in self._queue: 

626 continue 

627 

628 remoteEvent, localEvent, errorMsg = self._queue[eventNumber] 

629 objindex = self._index[localEvent.objtype][localEvent.objpkey] 

630 

631 # If current event isn't the first of its object index, ignore it 

632 # because the previous must be processed before 

633 if eventNumber != min(objindex): 

634 continue 

635 

636 yield (eventNumber, remoteEvent, localEvent, errorMsg) 

637 

638 def allEvents(self) -> Iterable: 

639 """Returns an iterator of all current instance events 

640 

641 Each entry will contains 4 values: 

642 1. eventNumber: int 

643 2. remoteEvent: remote event, or None if the entry was generated by a 

644 "pure" local event (eg. a change in the client datamodel) 

645 3. localEvent: local event 

646 4. errorMsg: str or None 

647 """ 

648 eventNumber: int 

649 remoteEvent: Event | None 

650 localEvent: Event 

651 errorMsg: str | None 

652 

653 for eventNumber in list(self._queue.keys()): 

654 # Event may have been removed during iteration 

655 # e.g. a call to purgeAllEventsOfDataObject() may remove several events 

656 if eventNumber not in self._queue: 

657 continue 

658 

659 remoteEvent, localEvent, errorMsg = self._queue[eventNumber] 

660 yield (eventNumber, remoteEvent, localEvent, errorMsg) 

661 

662 def __len__(self) -> int: 

663 """Returns the number of Event in queue""" 

664 return len(self._queue) 

665 

666 def keys(self) -> set[int]: 

667 """Returns a set with every event number in current instance""" 

668 return set(self._queue.keys()) 

669 

670 def _getLocalObjtype(self, objtype: str, isLocalObjtype: bool) -> str | None: 

671 """Returns the local objtype, or None if it doesn't exist""" 

672 if isLocalObjtype: 

673 if objtype in self._typesMapping["local"]: 

674 return objtype 

675 else: 

676 return None 

677 else: 

678 return self._typesMapping["remote"].get(objtype) 

679 

680 def containsObject(self, objtype: str, objpkey: Any, isLocalObjtype: bool) -> bool: 

681 """Indicate if object of specified objpkey of specified objtype exists in 

682 current instance""" 

683 l_objtype = self._getLocalObjtype(objtype, isLocalObjtype) 

684 if l_objtype is None: 

685 return False 

686 return l_objtype in self._index and objpkey in self._index[l_objtype] 

687 

688 def containsObjectByEvent(self, event: Event, isLocalEvent: bool) -> bool: 

689 """Indicate if object of specified event exists in current instance""" 

690 return self.containsObject(event.objtype, event.objpkey, isLocalEvent) 

691 

692 def isEventAParentOfAnotherError(self, event: Event, isLocalEvent: bool) -> bool: 

693 """Indicate if object of specified event is a parent (a foreign key) of 

694 an object that exists in current instance""" 

695 l_objtype = self._getLocalObjtype(event.objtype, isLocalEvent) 

696 if l_objtype is None: 

697 return False 

698 parentEvs = self._parentObjs.get(l_objtype, {}).get(event.objpkey) 

699 return parentEvs is not None 

700 

701 def purgeAllEvents(self, objtype: str, objpkey: Any, isLocalObjtype: bool): 

702 """Delete all events of specified objpkey of specified objtype from current 

703 instance""" 

704 if not self.containsObject(objtype, objpkey, isLocalObjtype): 

705 return 

706 

707 l_objtype = self._getLocalObjtype(objtype, isLocalObjtype) 

708 if l_objtype is None: 

709 return 

710 

711 eventNumbers = self._index[l_objtype][objpkey] 

712 

713 # Loop over a copy as content may be removed during iteration 

714 for eventNumber in eventNumbers.copy(): 

715 self.remove(eventNumber) 

716 

717 def purgeAllEventsOfDataObject(self, obj: DataObject, isLocalObjtype: bool): 

718 """Delete all events of specified objpkey of specified objtype from current 

719 instance""" 

720 self.purgeAllEvents(obj.getType(), obj.getPKey(), isLocalObjtype) 

721 

722 def updatePrimaryKeys( 

723 self, 

724 new_remote_pkeys: dict[str, str], 

725 remote_data: Datasource, 

726 remote_data_complete: Datasource, 

727 new_local_pkeys: dict[str, str], 

728 local_data: Datasource, 

729 local_data_complete: Datasource, 

730 ): 

731 """Will update primary keys. new_remote_pkeys is a dict with remote objtype as 

732 key, and the new remote primary key attribute name as value. new_local_pkeys is 

733 a dict with local objtype as key, and the new local primary key attribute name 

734 as value. 

735 The specified datasources must not have their primary keys updated yet, to 

736 allow conversion. 

737 

738 The ErrorQueue MUST immediately be saved and re-instantiated from cache by 

739 caller to reflect data changes. 

740 """ 

741 newqueue = {} 

742 for eventNumber, (remoteEvent, localEvent, errorMsg) in self._queue.items(): 

743 # Remote event, if any 

744 if remoteEvent is None: 

745 newRemoteEvent = None 

746 else: 

747 if remoteEvent.objtype not in new_remote_pkeys.keys(): 

748 # Objtype of remote event has no pkey update 

749 newRemoteEvent = remoteEvent 

750 else: 

751 oldobj = remote_data[remoteEvent.objtype].get(remoteEvent.objpkey) 

752 if oldobj is None: 

753 oldobj = remote_data_complete[remoteEvent.objtype][ 

754 remoteEvent.objpkey 

755 ] 

756 if type(new_remote_pkeys[remoteEvent.objtype]) is tuple: 

757 # New pkey is a tuple, loop over each attr 

758 newpkey = [] 

759 for pkattr in new_remote_pkeys[remoteEvent.objtype]: 

760 newpkey.append(getattr(oldobj, pkattr)) 

761 newpkey = tuple(newpkey) 

762 else: 

763 newpkey = getattr(oldobj, new_remote_pkeys[remoteEvent.objtype]) 

764 

765 # Save modified remote event 

766 newRemoteEvent = deepcopy(remoteEvent) 

767 newRemoteEvent.objpkey = newpkey 

768 

769 # Local event 

770 if localEvent.objtype not in new_local_pkeys.keys(): 

771 # Objtype of local event has no pkey update 

772 newLocalEvent = localEvent 

773 else: 

774 oldobj = local_data[localEvent.objtype].get(localEvent.objpkey) 

775 if oldobj is None: 

776 oldobj = local_data_complete[localEvent.objtype][localEvent.objpkey] 

777 if type(new_local_pkeys[localEvent.objtype]) is tuple: 

778 # New pkey is a tuple, loop over each attr 

779 newpkey = [] 

780 for pkattr in new_local_pkeys[localEvent.objtype]: 

781 newpkey.append(getattr(oldobj, pkattr)) 

782 newpkey = tuple(newpkey) 

783 else: 

784 newpkey = getattr(oldobj, new_local_pkeys[localEvent.objtype]) 

785 

786 # Save modified local event 

787 newLocalEvent = deepcopy(localEvent) 

788 newLocalEvent.objpkey = newpkey 

789 

790 # Update reserved _pkey* attributes in added events 

791 if newLocalEvent.eventtype == "added": 

792 # Remove previous pkey attributes 

793 for attr in list(newLocalEvent.objattrs.keys()): 

794 if attr.startswith("_pkey_"): 

795 del newLocalEvent.objattrs[attr] 

796 # Add new pkey attributes 

797 if type(new_local_pkeys[localEvent.objtype]) is tuple: 

798 for i in range(len(newpkey)): 

799 newLocalEvent.objattrs[ 

800 new_local_pkeys[localEvent.objtype][i] 

801 ] = newpkey[i] 

802 else: 

803 newLocalEvent.objattrs[new_local_pkeys[localEvent.objtype]] = ( 

804 newpkey 

805 ) 

806 

807 # Save modified entry 

808 newqueue[eventNumber] = (newRemoteEvent, newLocalEvent, errorMsg) 

809 

810 self._queue = newqueue