Coverage for clients / errorqueue.py: 97%
337 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-16 15:16 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-16 15:16 +0000
1#!/usr/bin/env python3
2# -*- coding: utf-8 -*-
4# Hermes : Change Data Capture (CDC) tool from any source(s) to any target
5# Copyright (C) 2023 INSA Strasbourg
6#
7# This file is part of Hermes.
8#
9# Hermes is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# Hermes is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with Hermes. If not, see <https://www.gnu.org/licenses/>.
23from copy import deepcopy
24from typing import Any, Iterable
26from lib.datamodel.event import Event
27from lib.datamodel.dataobject import DataObject
28from lib.datamodel.datasource import Datasource
29from lib.datamodel.foreignkey import ForeignKey
30from lib.datamodel.serialization import LocalCache
33class HermesInvalidErrorQueueJSONError(Exception):
34 """Raised when trying to import an ErrorQueue from json with invalid JSON data"""
37class ErrorQueue(LocalCache):
38 """Store and manage an indexed event queue. Useful for retrying Event in error"""
40 def __init__(
41 self,
42 typesMapping: dict[str, list[str]],
43 remotedata: Datasource | None = None,
44 remotedata_complete: Datasource | None = None,
45 localdata: Datasource | None = None,
46 localdata_complete: Datasource | None = None,
47 from_json_dict: dict[str, Any] | None = None,
48 autoremediate: str = "disabled",
49 ):
50 """Create an empty Event queue, or load it from specified 'from_json_dict'"""
52 if autoremediate in ("conservative", "maximum"):
53 self._autoremediate: str | None = autoremediate
54 """If set, indicate the policy to use for autoremediation"""
55 else:
56 self._autoremediate = None
58 self._queue: dict[int, tuple[Event | None, Event, str | None]] = {}
59 """The event queue, key is a unique integer, value is a tuple with the
60 remote event (or None if the entry was generated by a "pure" local event,
61 eg. a change in the client datamodel), the local event, and a string
62 containing an optional error message"""
64 self._index: dict[str, dict[Any, set[int]]] = {}
65 """Index table of events.
66 The keys are
67 1. the local event object type (str)
68 2. the event object primary key (Any)
69 The value is a set containing all eventNumber in queue for the keys
71 self._index[localevent.objtype][localevent.objpkey] =
72 set([eventNumber1, eventNumber2, ...])
73 """
75 self._parentObjs: dict[str, dict[Any, set[int]]] = {}
76 """Index of parent objects of objects in errors, with the eventNumber
77 list that have stored them in this state.
78 The keys are
79 1. the local object type (str)
80 2. the object primary key (Any)
81 The value is a set containing all eventNumber in queue for the keys
83 self._parentObjs[objtype][objpkey] = set([eventNumber1, eventNumber2, ...])
84 """
86 self._typesMapping = {
87 "local": {},
88 "remote": {},
89 }
90 for remoteobj, localobjs in typesMapping.items():
91 for localobj in localobjs:
92 self._typesMapping["local"][localobj] = remoteobj
93 self._typesMapping["remote"][remoteobj] = localobjs
95 """Mapping between local and remote objects types
96 - self._typesMapping["local"]["local_type"] return the corresponding remote
97 type
98 - self._typesMapping["remote"]["remote_type"] return the corresponding
99 local types
100 """
102 super().__init__(jsondataattr=["_queue"])
104 self.updateDatasources(
105 remotedata, remotedata_complete, localdata, localdata_complete
106 )
108 if from_json_dict:
109 if from_json_dict.keys() != set(["_queue"]):
110 raise HermesInvalidErrorQueueJSONError(f"{from_json_dict=}")
111 else:
112 # Prevent changes on deep references of from_json_dict
113 from_json = deepcopy(from_json_dict)
114 for eventNumber, (
115 remoteEventDict,
116 localEventDict,
117 errorMsg,
118 ) in from_json["_queue"].items():
119 if remoteEventDict is None:
120 remoteEvent = None
121 else:
122 remoteEvent = Event(from_json_dict=remoteEventDict)
123 localEvent = Event(from_json_dict=localEventDict)
124 self._append(remoteEvent, localEvent, errorMsg, int(eventNumber))
126 def updateDatasources(
127 self,
128 remotedata: Datasource | None = None,
129 remotedata_complete: Datasource | None = None,
130 localdata: Datasource | None = None,
131 localdata_complete: Datasource | None = None,
132 ):
133 """Update the references to the datasources used, required for a case in
134 autoremediation"""
135 self._remotedata: Datasource | None = remotedata
136 self._remotedata_complete: Datasource | None = remotedata_complete
137 self._localdata: Datasource | None = localdata
138 self._localdata_complete: Datasource | None = localdata_complete
140 # Reset parentObjs and refill it
141 self._parentObjs = {}
142 for eventNumber in self._queue.keys():
143 self._addParentObjs(eventNumber)
145 def append(
146 self, remoteEvent: Event | None, localEvent: Event | None, errorMsg: str | None
147 ):
148 """Append specified event to queue"""
149 self._append(
150 remoteEvent, localEvent, errorMsg, 1 + max(self._queue.keys(), default=0)
151 )
153 def _append(
154 self,
155 remoteEvent: Event | None,
156 localEvent: Event | None,
157 errorMsg: str | None,
158 eventNumber: int,
159 ):
160 """Append specified event to queue, at specified eventNumber"""
161 if eventNumber in self._queue:
162 raise IndexError(f"Specified {eventNumber=} already exist in queue")
164 if (
165 remoteEvent is not None
166 and remoteEvent.objtype not in self._typesMapping["remote"]
167 ):
168 __hermes__.logger.info(
169 "Ignore loading of remote event of unknown objtype"
170 f" {remoteEvent.objtype}"
171 )
172 return
174 if localEvent.objtype not in self._typesMapping["local"]:
175 __hermes__.logger.info(
176 f"Ignore loading of local event of unknown objtype {localEvent.objtype}"
177 )
178 return
180 self._queue[eventNumber] = (remoteEvent, localEvent, errorMsg)
181 self._addEventToIndex(eventNumber)
182 self._addParentObjs(eventNumber)
184 if self._autoremediate:
185 self._remediateWithPrevious(eventNumber)
187 def _mergeEvents(
188 self,
189 prevEvent: Event | None,
190 lastEvent: Event | None,
191 datasource: Datasource | None,
192 datasource_complete: Datasource | None,
193 previousEvents: list[Event],
194 ) -> tuple[bool, Event | None]:
195 """Merge two events for remediation, and returns the result as a tuple with
196 - wasMerged : a boolean indicating that the merge was done, meaning that the
197 last event must be removed. If False, the values of removeBothEvents and
198 newEvent in tuple must be ignored
199 - removeBothEvents: a boolean indicating that the merge consists of both events
200 removal. If True, the value of newEvent must be ignored
201 - newEvent: the merged Event
202 """
203 # Handle None values, that may only occurs for remote events
204 if lastEvent is None and prevEvent is None:
205 # No data : merging is easy
206 __hermes__.logger.info("Merging two None events, result is None")
207 return (True, False, None)
208 elif lastEvent is None:
209 # Keep prevEvent values
210 __hermes__.logger.info(
211 f"Merging {prevEvent.objattrs=} with lastEvent=None,"
212 f" result is {prevEvent=}"
213 )
214 return (True, False, prevEvent)
215 elif prevEvent is None:
216 # Keep lastEvent values
217 __hermes__.logger.info(
218 f"Merging prevEvent=None with {lastEvent.objattrs=},"
219 f" result is {lastEvent=}"
220 )
221 return (True, False, lastEvent)
223 if (
224 (prevEvent.eventtype == "added" and lastEvent.eventtype == "added")
225 or (prevEvent.eventtype == "removed" and lastEvent.eventtype == "modified")
226 or (prevEvent.eventtype == "removed" and lastEvent.eventtype == "removed")
227 or (prevEvent.eventtype == "modified" and lastEvent.eventtype == "added")
228 ):
229 errmsg = (
230 f"BUG : trying to merge a {lastEvent.eventtype} event with a"
231 f" previous {prevEvent.eventtype} event, this should never happen."
232 f" {lastEvent=} {prevEvent=}"
233 )
234 __hermes__.logger.critical(errmsg)
235 raise AssertionError(errmsg)
237 elif prevEvent.eventtype == "added" and lastEvent.eventtype == "modified":
238 # Merge the modified event data into the added event
239 mergedEvent = deepcopy(prevEvent)
240 objattrs = mergedEvent.objattrs
241 objattrs.update(lastEvent.objattrs.get("added", dict()))
242 objattrs.update(lastEvent.objattrs.get("modified", dict()))
243 for key in (
244 objattrs.keys() & lastEvent.objattrs.get("removed", dict()).keys()
245 ):
246 del objattrs[key]
248 __hermes__.logger.info(
249 f"Merging added {prevEvent.objattrs=} with modified"
250 f" {lastEvent.objattrs=}, result is added {objattrs=}"
251 )
252 return (True, False, mergedEvent)
253 elif prevEvent.eventtype == "added" and lastEvent.eventtype == "removed":
254 if self._autoremediate == "maximum":
255 # Remove the two events
256 return (True, True, None)
257 # Use "conservative" as fallback : don't merge the events
258 return (False, False, None)
259 elif prevEvent.eventtype == "removed" and lastEvent.eventtype == "added":
260 if self._autoremediate == "maximum":
261 # Ensure required data to process is available, otherwise fallback to
262 # "conservative" policy
263 if datasource is None:
264 __hermes__.logger.info(
265 f"Unable to merge removed {prevEvent=} with added "
266 f"{lastEvent.objattrs=}, as no datasource is available. "
267 "Fallback to 'conservative' mode."
268 )
269 else:
270 currentObj: DataObject | None = datasource.get(
271 lastEvent.objtype, {}
272 ).get(lastEvent.objpkey, None)
274 newObj: DataObject | None = datasource_complete.get(
275 lastEvent.objtype, {}
276 ).get(lastEvent.objpkey, None)
278 if len(previousEvents) != 0:
279 # There are previous unprocesed events in error queue.
280 # Merging is possible but first we have to apply previous
281 # events changes to (a copy of) currentObj in order to
282 # determine object's state before prevEvent
284 # Imported here to avoid circular dependency
285 from .datamodel import Datamodel
287 currentObj = deepcopy(currentObj)
288 ev: Event
289 for ev in previousEvents:
290 if ev is None:
291 continue
292 elif ev.eventtype == "added":
293 currentObj = Datamodel.createDataobject(
294 datasource.schema, ev.objtype, ev.objattrs
295 )
296 elif ev.eventtype == "modified":
297 if currentObj is None:
298 # Should never occur
299 errmsg = (
300 "BUG : unexpected object status met when trying"
301 f" to merge two events {lastEvent=}"
302 f" {lastEvent.eventtype=} ; {prevEvent=}"
303 f" {prevEvent.eventtype=}"
304 )
305 __hermes__.logger.critical(errmsg)
306 raise AssertionError(errmsg)
307 currentObj = Datamodel.getUpdatedObject(
308 currentObj, ev.objattrs
309 )
310 elif ev.eventtype == "removed":
311 # Should never occur
312 currentObj = None
314 if currentObj is None or newObj is None:
315 __hermes__.logger.warning(
316 f"BUG ? - Unable to merge removed {prevEvent=} with added "
317 f"{lastEvent.objattrs=}, as related object was not found "
318 f"in caches. {currentObj=} {newObj=}"
319 )
320 else:
321 # Diff the desired object with current one to generate a
322 # modified event
323 mergedEvent, _ = Event.fromDiffItem(
324 newObj.diffFrom(currentObj), "base", "modified"
325 )
327 if (
328 mergedEvent.objattrs["added"]
329 or mergedEvent.objattrs["modified"]
330 or mergedEvent.objattrs["removed"]
331 ):
332 __hermes__.logger.info(
333 f"Merging removed {prevEvent=} with added"
334 f" {lastEvent.objattrs=}, result is modified"
335 f" {mergedEvent=} {mergedEvent.objattrs=}"
336 )
337 return (True, False, mergedEvent)
338 else:
339 __hermes__.logger.info(
340 f"Merging removed {prevEvent=} with added "
341 f"{lastEvent.objattrs=}, result is an empty modified"
342 " event (without any change). Ignoring it"
343 )
344 return (True, True, mergedEvent)
346 # Use "conservative" as fallback : don't merge the events
347 return (False, False, None)
348 elif prevEvent.eventtype == "modified" and lastEvent.eventtype == "modified":
349 # Merge the two modified events
350 mergedEvent = deepcopy(prevEvent)
351 objattrs = mergedEvent.objattrs
353 lastAdded: dict[str, Any] = lastEvent.objattrs.get("added", dict())
354 lastModified: dict[str, Any] = lastEvent.objattrs.get("modified", dict())
355 lastRemoved: dict[str, Any] = lastEvent.objattrs.get("removed", dict())
357 # Newly added attributes
358 for attr, newval in lastAdded.items():
359 # # Newly added should not exist in prev modified
360 # if attr in objattrs["modified"]:
361 # raise AssertionError
362 # Merge prev added with last added
363 objattrs["added"][attr] = newval
364 # As attr is now added, it should not be removed anymore
365 if attr in objattrs["removed"]:
366 del objattrs["removed"][attr]
368 # Newly modified attributes
369 for attr, newval in lastModified.items():
370 # # Newly modified should not exist in prev removed
371 # if attr in objattrs["removed"]:
372 # raise AssertionError
373 # If was added in prev, keep it as added, but update its value
374 if attr in objattrs["added"]:
375 objattrs["added"][attr] = newval
376 else:
377 # Keep or update the last modified value
378 objattrs["modified"][attr] = newval
380 # Newly removed attributes
381 for attr, newval in lastRemoved.items():
382 # # Newly removed should not exist in prev removed
383 # if attr in objattrs["removed"]:
384 # raise AssertionError
385 if attr in objattrs["added"]:
386 # Added + removed : nothing to keep
387 del objattrs["added"][attr]
388 else:
389 if attr in objattrs["modified"]:
390 # Modified + removed : don't keep the modified
391 del objattrs["modified"][attr]
392 # Keep the removed
393 objattrs["removed"][attr] = newval
395 __hermes__.logger.info(
396 f"Merging modified {prevEvent.objattrs=} with modified"
397 f" {lastEvent.objattrs=}, result is modified {objattrs=}"
398 )
399 return (True, False, mergedEvent)
400 elif prevEvent.eventtype == "modified" and lastEvent.eventtype == "removed":
401 if self._autoremediate == "maximum":
402 # Remove prevEvent
403 __hermes__.logger.info(
404 f"Merging modified {prevEvent.objattrs=} with removed"
405 f" {lastEvent.objattrs=}, result is removed {lastEvent=}"
406 )
407 return (True, False, lastEvent)
408 # Use "conservative" as fallback : don't merge the events
409 return (False, False, None)
410 else:
411 errmsg = (
412 "BUG : unexpected eventtype met when trying to merge two events "
413 f"{lastEvent=} {lastEvent.eventtype=}"
414 f" ; {prevEvent=} {prevEvent.eventtype=}"
415 )
416 __hermes__.logger.critical(errmsg)
417 raise AssertionError(errmsg)
419 def _remediateWithPrevious(self, eventNumber: int):
420 lastEventNumber = eventNumber
421 lastRemoteEvent, lastLocalEvent, lastErrorMsg = self._queue[lastEventNumber]
423 allEventNumbers = sorted(
424 self._index[lastLocalEvent.objtype][lastLocalEvent.objpkey]
425 )
426 allEvents = [self._queue[evNum] for evNum in allEventNumbers]
427 if len(allEvents) < 2:
428 # No previous event to remediate with
429 return
431 previousRemoteEvents = [i[0] for i in allEvents[:-2]]
432 previousLocalEvents = [i[1] for i in allEvents[:-2]]
434 prevEventNumber = allEventNumbers[-2]
435 prevRemoteEvent, prevLocalEvent, prevErrorMsg = allEvents[-2]
437 # Can't merge partially processed events
438 if (
439 prevLocalEvent.isPartiallyProcessed
440 or lastLocalEvent.isPartiallyProcessed
441 or (prevRemoteEvent is not None and prevRemoteEvent.isPartiallyProcessed)
442 or (lastRemoteEvent is not None and lastRemoteEvent.isPartiallyProcessed)
443 ):
444 stepsvalues = []
445 if prevRemoteEvent is not None:
446 stepsvalues.append(f"{prevRemoteEvent.isPartiallyProcessed=}")
447 else:
448 stepsvalues.append(f"{prevRemoteEvent=}")
449 stepsvalues.append(f"{prevLocalEvent.isPartiallyProcessed=}")
450 if lastRemoteEvent is not None:
451 stepsvalues.append(f"{lastRemoteEvent.isPartiallyProcessed=}")
452 else:
453 stepsvalues.append(f"{lastRemoteEvent=}")
454 stepsvalues.append(f"{lastLocalEvent.isPartiallyProcessed=}")
456 __hermes__.logger.info(
457 "Unable to merge two events of which at least one has already been"
458 f" partially processed. {' '.join(stepsvalues)}"
459 )
460 return
462 remotedWasMerged, remoteRemoveBothEvents, newRemoteEvent = self._mergeEvents(
463 prevRemoteEvent,
464 lastRemoteEvent,
465 self._remotedata,
466 self._remotedata_complete,
467 previousRemoteEvents,
468 )
469 localWasMerged, localRemoveBothEvents, newLocalEvent = self._mergeEvents(
470 prevLocalEvent,
471 lastLocalEvent,
472 self._localdata,
473 self._localdata_complete,
474 previousLocalEvents,
475 )
477 if remotedWasMerged != localWasMerged:
478 errmsg = (
479 "BUG : inconsistency between remote and local merge results :"
480 f" {remotedWasMerged=}, {localWasMerged=}. {prevEventNumber=},"
481 f" {lastEventNumber=} {prevRemoteEvent.toString(set())=}"
482 f" {lastRemoteEvent.toString(set())=},"
483 f" {prevLocalEvent.toString(set())=}, {lastLocalEvent.toString(set())=}"
484 )
485 __hermes__.logger.critical(errmsg)
486 raise AssertionError(errmsg)
488 if not localWasMerged:
489 # No merge was done
490 return
492 # Local processing result is the only one that really matters here
493 if localRemoveBothEvents:
494 self.remove(lastEventNumber)
495 self.remove(prevEventNumber)
496 else:
497 # Last event was merged into previous, update previous and remove last from
498 # queue
499 self._queue[prevEventNumber] = (newRemoteEvent, newLocalEvent, prevErrorMsg)
500 self.remove(lastEventNumber)
502 def _addEventToIndex(self, eventNumber: int):
503 """Add specified event to index"""
504 if eventNumber not in self._queue:
505 raise IndexError(f"Specified {eventNumber=} doesn't exist in queue")
507 remoteEvent: Event | None
508 localEvent: Event
509 remoteEvent, localEvent, errorMsg = self._queue[eventNumber]
511 objtype = localEvent.objtype
513 # Create objtype sublevel if it doesn't exist yet
514 if objtype not in self._index:
515 self._index[objtype] = {}
517 if localEvent.objpkey not in self._index[objtype]:
518 # Create the set with specified eventNumber
519 self._index[objtype][localEvent.objpkey] = set([eventNumber])
520 else:
521 # Add specified eventNumber to the set
522 self._index[objtype][localEvent.objpkey].add(eventNumber)
524 def _addParentObjs(self, eventNumber: int):
525 """Index parent objects of specified event"""
526 if self._localdata is None:
527 # Unable to proceed without a datasource
528 return
530 if eventNumber not in self._queue:
531 raise IndexError(f"Specified {eventNumber=} doesn't exist in queue")
533 remoteEvent: Event | None
534 localEvent: Event
535 remoteEvent, localEvent, errorMsg = self._queue[eventNumber]
537 fromobj: DataObject | None = self._localdata.get(localEvent.objtype, {}).get(
538 localEvent.objpkey, None
539 )
540 if fromobj is None:
541 fromobj = self._localdata_complete.get(localEvent.objtype, {}).get(
542 localEvent.objpkey, None
543 )
544 if fromobj is None:
545 return
547 parents = ForeignKey.fetchParentObjs(self._localdata, fromobj)
549 for parent in parents:
550 objtype = parent.getType()
551 # Create objtype sublevel if it doesn't exist yet
552 if objtype not in self._parentObjs:
553 self._parentObjs[objtype] = {}
555 if parent.getPKey() not in self._parentObjs[objtype]:
556 # Create the set with specified eventNumber
557 self._parentObjs[objtype][parent.getPKey()] = set([eventNumber])
558 else:
559 # Add specified eventNumber to the set
560 self._parentObjs[objtype][parent.getPKey()].add(eventNumber)
562 def updateErrorMsg(self, eventNumber: int, errorMsg: str):
563 """Update errorMsg of specified eventNumber"""
564 if eventNumber not in self._queue:
565 raise IndexError(f"Specified {eventNumber=} doesn't exist in queue")
567 remoteEvent: Event | None
568 localEvent: Event
569 remoteEvent, localEvent, oldErrorMsg = self._queue[eventNumber]
570 self._queue[eventNumber] = (remoteEvent, localEvent, errorMsg)
572 def remove(self, eventNumber: int, ignoreMissingEventNumber=False):
573 """Remove event of specified eventNumber from queue"""
574 if eventNumber not in self._queue:
575 if ignoreMissingEventNumber:
576 return
577 else:
578 raise IndexError(f"Specified {eventNumber=} doesn't exist in queue")
580 remoteEvent: Event | None
581 localEvent: Event
582 remoteEvent, localEvent, errorMsg = self._queue[eventNumber]
584 del self._queue[eventNumber] # Remove data from queue
586 objtype = localEvent.objtype
588 # Remove from index
589 self._index[objtype][localEvent.objpkey].remove(eventNumber)
591 # Purge index uplevels when empty
592 if not self._index[objtype][localEvent.objpkey]:
593 del self._index[objtype][localEvent.objpkey]
595 if not self._index[objtype]:
596 del self._index[objtype]
598 # Remove all references of specified eventNumber from _parentObjs
599 for objtype in tuple(self._parentObjs.keys()):
600 for objpkey in tuple(self._parentObjs[objtype].keys()):
601 # Remove from parentObjs
602 self._parentObjs[objtype][objpkey].discard(eventNumber)
604 # Purge index uplevels when empty
605 if not self._parentObjs[objtype][objpkey]:
606 del self._parentObjs[objtype][objpkey]
608 if not self._parentObjs[objtype]:
609 del self._parentObjs[objtype]
611 def __iter__(self) -> Iterable:
612 """Returns an iterator of current instance events to process
613 It will ignore events about an object that still have older event in queue
615 Each entry will contains 4 values:
616 1. eventNumber: int
617 2. remoteEvent: remote event, or None if the entry was generated by a
618 "pure" local event (eg. a change in the client datamodel)
619 3. localEvent: local event
620 4. errorMsg: str or None
621 """
622 eventNumber: int
623 remoteEvent: Event | None
624 localEvent: Event
625 errorMsg: str | None
627 for eventNumber in list(self._queue.keys()):
628 # Event may have been removed during iteration
629 # e.g. a call to purgeAllEventsOfDataObject() may remove several events
630 if eventNumber not in self._queue:
631 continue
633 remoteEvent, localEvent, errorMsg = self._queue[eventNumber]
634 objindex = self._index[localEvent.objtype][localEvent.objpkey]
636 # If current event isn't the first of its object index, ignore it
637 # because the previous must be processed before
638 if eventNumber != min(objindex):
639 continue
641 yield (eventNumber, remoteEvent, localEvent, errorMsg)
643 def allEvents(self) -> Iterable:
644 """Returns an iterator of all current instance events
646 Each entry will contains 4 values:
647 1. eventNumber: int
648 2. remoteEvent: remote event, or None if the entry was generated by a
649 "pure" local event (eg. a change in the client datamodel)
650 3. localEvent: local event
651 4. errorMsg: str or None
652 """
653 eventNumber: int
654 remoteEvent: Event | None
655 localEvent: Event
656 errorMsg: str | None
658 for eventNumber in list(self._queue.keys()):
659 # Event may have been removed during iteration
660 # e.g. a call to purgeAllEventsOfDataObject() may remove several events
661 if eventNumber not in self._queue:
662 continue
664 remoteEvent, localEvent, errorMsg = self._queue[eventNumber]
665 yield (eventNumber, remoteEvent, localEvent, errorMsg)
667 def __len__(self) -> int:
668 """Returns the number of Event in queue"""
669 return len(self._queue)
671 def keys(self) -> set[int]:
672 """Returns a set with every event number in current instance"""
673 return set(self._queue.keys())
675 def _getLocalObjtype(self, objtype: str, isLocalObjtype: bool) -> list[str] | None:
676 """Returns the local objtype, or None if it doesn't exist"""
677 if isLocalObjtype:
678 if objtype in self._typesMapping["local"]:
679 return [objtype]
680 else:
681 return None
682 else:
683 return self._typesMapping["remote"].get(objtype)
685 def containsObject(self, objtype: str, objpkey: Any, isLocalObjtype: bool) -> bool:
686 """Indicate if object of specified objpkey of specified objtype exists in
687 current instance"""
688 l_objtypes = self._getLocalObjtype(objtype, isLocalObjtype)
689 if l_objtypes is None:
690 return False
691 for l_objtype in l_objtypes:
692 if l_objtype in self._index and objpkey in self._index[l_objtype]:
693 return True
694 return False
696 def containsObjectByEvent(self, event: Event, isLocalEvent: bool) -> bool:
697 """Indicate if object of specified event exists in current instance"""
698 return self.containsObject(event.objtype, event.objpkey, isLocalEvent)
700 def isEventAParentOfAnotherError(self, event: Event, isLocalEvent: bool) -> bool:
701 """Indicate if object of specified event is a parent (a foreign key) of
702 an object that exists in current instance"""
703 l_objtypes = self._getLocalObjtype(event.objtype, isLocalEvent)
704 if l_objtypes is None:
705 return False
706 for l_objtype in l_objtypes:
707 parentEvs = self._parentObjs.get(l_objtype, {}).get(event.objpkey)
708 if parentEvs is not None:
709 return True
710 return False
712 def purgeAllEvents(self, objtype: str, objpkey: Any, isLocalObjtype: bool):
713 """Delete all events of specified objpkey of specified objtype from current
714 instance"""
715 if not self.containsObject(objtype, objpkey, isLocalObjtype):
716 return
718 l_objtypes = self._getLocalObjtype(objtype, isLocalObjtype)
719 if l_objtypes is None:
720 return
722 for l_objtype in l_objtypes:
723 eventNumbers = self._index[l_objtype][objpkey]
725 # Loop over a copy as content may be removed during iteration
726 for eventNumber in eventNumbers.copy():
727 self.remove(eventNumber)
729 def purgeAllEventsOfDataObject(self, obj: DataObject, isLocalObjtype: bool):
730 """Delete all events of specified objpkey of specified objtype from current
731 instance"""
732 self.purgeAllEvents(obj.getType(), obj.getPKey(), isLocalObjtype)
734 def updatePrimaryKeys(
735 self,
736 new_remote_pkeys: dict[str, str],
737 remote_data: Datasource,
738 remote_data_complete: Datasource,
739 new_local_pkeys: dict[str, str],
740 local_data: Datasource,
741 local_data_complete: Datasource,
742 ):
743 """Will update primary keys. new_remote_pkeys is a dict with remote objtype as
744 key, and the new remote primary key attribute name as value. new_local_pkeys is
745 a dict with local objtype as key, and the new local primary key attribute name
746 as value.
747 The specified datasources must not have their primary keys updated yet, to
748 allow conversion.
750 The ErrorQueue MUST immediately be saved and re-instantiated from cache by
751 caller to reflect data changes.
752 """
753 newqueue = {}
754 for eventNumber, (remoteEvent, localEvent, errorMsg) in self._queue.items():
755 # Remote event, if any
756 if remoteEvent is None:
757 newRemoteEvent = None
758 else:
759 if remoteEvent.objtype not in new_remote_pkeys.keys():
760 # Objtype of remote event has no pkey update
761 newRemoteEvent = remoteEvent
762 else:
763 oldobj = remote_data[remoteEvent.objtype].get(remoteEvent.objpkey)
764 if oldobj is None:
765 oldobj = remote_data_complete[remoteEvent.objtype][
766 remoteEvent.objpkey
767 ]
768 if type(new_remote_pkeys[remoteEvent.objtype]) is tuple:
769 # New pkey is a tuple, loop over each attr
770 newpkey = []
771 for pkattr in new_remote_pkeys[remoteEvent.objtype]:
772 newpkey.append(getattr(oldobj, pkattr))
773 newpkey = tuple(newpkey)
774 else:
775 newpkey = getattr(oldobj, new_remote_pkeys[remoteEvent.objtype])
777 # Save modified remote event
778 newRemoteEvent = deepcopy(remoteEvent)
779 newRemoteEvent.objpkey = newpkey
781 # Local event
782 if localEvent.objtype not in new_local_pkeys.keys():
783 # Objtype of local event has no pkey update
784 newLocalEvent = localEvent
785 else:
786 oldobj = local_data[localEvent.objtype].get(localEvent.objpkey)
787 if oldobj is None:
788 oldobj = local_data_complete[localEvent.objtype][localEvent.objpkey]
789 if type(new_local_pkeys[localEvent.objtype]) is tuple:
790 # New pkey is a tuple, loop over each attr
791 newpkey = []
792 for pkattr in new_local_pkeys[localEvent.objtype]:
793 newpkey.append(getattr(oldobj, pkattr))
794 newpkey = tuple(newpkey)
795 else:
796 newpkey = getattr(oldobj, new_local_pkeys[localEvent.objtype])
798 # Save modified local event
799 newLocalEvent = deepcopy(localEvent)
800 newLocalEvent.objpkey = newpkey
802 # Update reserved _pkey* attributes in added events
803 if newLocalEvent.eventtype == "added":
804 # Remove previous pkey attributes
805 for attr in list(newLocalEvent.objattrs.keys()):
806 if attr.startswith("_pkey_"):
807 del newLocalEvent.objattrs[attr]
808 # Add new pkey attributes
809 if type(new_local_pkeys[localEvent.objtype]) is tuple:
810 for i in range(len(newpkey)):
811 newLocalEvent.objattrs[
812 new_local_pkeys[localEvent.objtype][i]
813 ] = newpkey[i]
814 else:
815 newLocalEvent.objattrs[new_local_pkeys[localEvent.objtype]] = (
816 newpkey
817 )
819 # Save modified entry
820 newqueue[eventNumber] = (newRemoteEvent, newLocalEvent, errorMsg)
822 self._queue = newqueue