Coverage for clients/errorqueue.py: 97%
326 statements
« prev ^ index » next coverage.py v7.10.1, created at 2025-07-28 07:25 +0000
« prev ^ index » next coverage.py v7.10.1, created at 2025-07-28 07:25 +0000
1#!/usr/bin/env python3
2# -*- coding: utf-8 -*-
4# Hermes : Change Data Capture (CDC) tool from any source(s) to any target
5# Copyright (C) 2023, 2024 INSA Strasbourg
6#
7# This file is part of Hermes.
8#
9# Hermes is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# Hermes is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with Hermes. If not, see <https://www.gnu.org/licenses/>.
23from copy import deepcopy
24from typing import Any, Iterable
26from lib.datamodel.event import Event
27from lib.datamodel.dataobject import DataObject
28from lib.datamodel.datasource import Datasource
29from lib.datamodel.foreignkey import ForeignKey
30from lib.datamodel.serialization import LocalCache
33class HermesInvalidErrorQueueJSONError(Exception):
34 """Raised when trying to import an ErrorQueue from json with invalid JSON data"""
37class ErrorQueue(LocalCache):
38 """Store and manage an indexed event queue. Useful for retrying Event in error"""
40 def __init__(
41 self,
42 typesMapping: dict[str, str],
43 remotedata: Datasource | None = None,
44 remotedata_complete: Datasource | None = None,
45 localdata: Datasource | None = None,
46 localdata_complete: Datasource | None = None,
47 from_json_dict: dict[str, Any] | None = None,
48 autoremediate: str = "disabled",
49 ):
50 """Create an empty Event queue, or load it from specified 'from_json_dict'"""
52 if autoremediate in ("conservative", "maximum"):
53 self._autoremediate: str | None = autoremediate
54 """If set, indicate the policy to use for autoremediation"""
55 else:
56 self._autoremediate = None
58 self._queue: dict[int, tuple[Event | None, Event, str | None]] = {}
59 """The event queue, key is a unique integer, value is a tuple with the
60 remote event (or None if the entry was generated by a "pure" local event,
61 eg. a change in the client datamodel), the local event, and a string
62 containing an optional error message"""
64 self._index: dict[str, dict[Any, set[int]]] = {}
65 """Index table of events.
66 The keys are
67 1. the local event object type (str)
68 2. the event object primary key (Any)
69 The value is a set containing all eventNumber in queue for the keys
71 self._index[localevent.objtype][localevent.objpkey] =
72 set([eventNumber1, eventNumber2, ...])
73 """
75 self._parentObjs: dict[str, dict[Any, set[int]]] = {}
76 """Index of parent objects of objects in errors, with the eventNumber
77 list that have stored them in this state.
78 The keys are
79 1. the local object type (str)
80 2. the object primary key (Any)
81 The value is a set containing all eventNumber in queue for the keys
83 self._parentObjs[objtype][objpkey] = set([eventNumber1, eventNumber2, ...])
84 """
86 self._typesMapping = {
87 "local": {v: k for k, v in typesMapping.items()},
88 "remote": {k: v for k, v in typesMapping.items()},
89 }
90 """Mapping between local and remote objects types
91 - self._typesMapping["local"]["local_type"] return the corresponding remote
92 type
93 - self._typesMapping["remote"]["remote_type"] return the corresponding
94 local type
95 """
97 super().__init__(jsondataattr=["_queue"])
99 self.updateDatasources(
100 remotedata, remotedata_complete, localdata, localdata_complete
101 )
103 if from_json_dict:
104 if from_json_dict.keys() != set(["_queue"]):
105 raise HermesInvalidErrorQueueJSONError(f"{from_json_dict=}")
106 else:
107 # Prevent changes on deep references of from_json_dict
108 from_json = deepcopy(from_json_dict)
109 for eventNumber, (
110 remoteEventDict,
111 localEventDict,
112 errorMsg,
113 ) in from_json["_queue"].items():
114 if remoteEventDict is None:
115 remoteEvent = None
116 else:
117 remoteEvent = Event(from_json_dict=remoteEventDict)
118 localEvent = Event(from_json_dict=localEventDict)
119 self._append(remoteEvent, localEvent, errorMsg, int(eventNumber))
121 def updateDatasources(
122 self,
123 remotedata: Datasource | None = None,
124 remotedata_complete: Datasource | None = None,
125 localdata: Datasource | None = None,
126 localdata_complete: Datasource | None = None,
127 ):
128 """Update the references to the datasources used, required for a case in
129 autoremediation"""
130 self._remotedata: Datasource | None = remotedata
131 self._remotedata_complete: Datasource | None = remotedata_complete
132 self._localdata: Datasource | None = localdata
133 self._localdata_complete: Datasource | None = localdata_complete
135 # Reset parentObjs and refill it
136 self._parentObjs = {}
137 for eventNumber in self._queue.keys():
138 self._addParentObjs(eventNumber)
140 def append(
141 self, remoteEvent: Event | None, localEvent: Event | None, errorMsg: str | None
142 ):
143 """Append specified event to queue"""
144 self._append(
145 remoteEvent, localEvent, errorMsg, 1 + max(self._queue.keys(), default=0)
146 )
148 def _append(
149 self,
150 remoteEvent: Event | None,
151 localEvent: Event | None,
152 errorMsg: str | None,
153 eventNumber: int,
154 ):
155 """Append specified event to queue, at specified eventNumber"""
156 if eventNumber in self._queue:
157 raise IndexError(f"Specified {eventNumber=} already exist in queue")
159 if (
160 remoteEvent is not None
161 and remoteEvent.objtype not in self._typesMapping["remote"]
162 ):
163 __hermes__.logger.info(
164 "Ignore loading of remote event of unknown objtype"
165 f" {remoteEvent.objtype}"
166 )
167 return
169 if localEvent.objtype not in self._typesMapping["local"]:
170 __hermes__.logger.info(
171 f"Ignore loading of local event of unknown objtype {localEvent.objtype}"
172 )
173 return
175 self._queue[eventNumber] = (remoteEvent, localEvent, errorMsg)
176 self._addEventToIndex(eventNumber)
177 self._addParentObjs(eventNumber)
179 if self._autoremediate:
180 self._remediateWithPrevious(eventNumber)
182 def _mergeEvents(
183 self,
184 prevEvent: Event | None,
185 lastEvent: Event | None,
186 datasource: Datasource | None,
187 datasource_complete: Datasource | None,
188 previousEvents: list[Event],
189 ) -> tuple[bool, Event | None]:
190 """Merge two events for remediation, and returns the result as a tuple with
191 - wasMerged : a boolean indicating that the merge was done, meaning that the
192 last event must be removed. If False, the values of removeBothEvents and
193 newEvent in tuple must be ignored
194 - removeBothEvents: a boolean indicating that the merge consists of both events
195 removal. If True, the value of newEvent must be ignored
196 - newEvent: the merged Event
197 """
198 # Handle None values, that may only occurs for remote events
199 if lastEvent is None and prevEvent is None:
200 # No data : merging is easy
201 __hermes__.logger.info("Merging two None events, result is None")
202 return (True, False, None)
203 elif lastEvent is None:
204 # Keep prevEvent values
205 __hermes__.logger.info(
206 f"Merging {prevEvent.objattrs=} with lastEvent=None,"
207 f" result is {prevEvent=}"
208 )
209 return (True, False, prevEvent)
210 elif prevEvent is None:
211 # Keep lastEvent values
212 __hermes__.logger.info(
213 f"Merging prevEvent=None with {lastEvent.objattrs=},"
214 f" result is {lastEvent=}"
215 )
216 return (True, False, lastEvent)
218 if (
219 (prevEvent.eventtype == "added" and lastEvent.eventtype == "added")
220 or (prevEvent.eventtype == "removed" and lastEvent.eventtype == "modified")
221 or (prevEvent.eventtype == "removed" and lastEvent.eventtype == "removed")
222 or (prevEvent.eventtype == "modified" and lastEvent.eventtype == "added")
223 ):
224 errmsg = (
225 f"BUG : trying to merge a {lastEvent.eventtype} event with a"
226 f" previous {prevEvent.eventtype} event, this should never happen."
227 f" {lastEvent=} {prevEvent=}"
228 )
229 __hermes__.logger.critical(errmsg)
230 raise AssertionError(errmsg)
232 elif prevEvent.eventtype == "added" and lastEvent.eventtype == "modified":
233 # Merge the modified event data into the added event
234 mergedEvent = deepcopy(prevEvent)
235 objattrs = mergedEvent.objattrs
236 objattrs.update(lastEvent.objattrs.get("added", dict()))
237 objattrs.update(lastEvent.objattrs.get("modified", dict()))
238 for key in (
239 objattrs.keys() & lastEvent.objattrs.get("removed", dict()).keys()
240 ):
241 del objattrs[key]
243 __hermes__.logger.info(
244 f"Merging added {prevEvent.objattrs=} with modified"
245 f" {lastEvent.objattrs=}, result is added {objattrs=}"
246 )
247 return (True, False, mergedEvent)
248 elif prevEvent.eventtype == "added" and lastEvent.eventtype == "removed":
249 if self._autoremediate == "maximum":
250 # Remove the two events
251 return (True, True, None)
252 # Use "conservative" as fallback : don't merge the events
253 return (False, False, None)
254 elif prevEvent.eventtype == "removed" and lastEvent.eventtype == "added":
255 if self._autoremediate == "maximum":
256 # Ensure required data to process is available, otherwise fallback to
257 # "conservative" policy
258 if datasource is None:
259 __hermes__.logger.info(
260 f"Unable to merge removed {prevEvent=} with added "
261 f"{lastEvent.objattrs=}, as no datasource is available. "
262 "Fallback to 'conservative' mode."
263 )
264 else:
265 currentObj: DataObject | None = datasource.get(
266 lastEvent.objtype, {}
267 ).get(lastEvent.objpkey, None)
269 newObj: DataObject | None = datasource_complete.get(
270 lastEvent.objtype, {}
271 ).get(lastEvent.objpkey, None)
273 if len(previousEvents) != 0:
274 # There are previous unprocesed events in error queue.
275 # Merging is possible but first we have to apply previous
276 # events changes to (a copy of) currentObj in order to
277 # determine object's state before prevEvent
279 # Imported here to avoid circular dependency
280 from .datamodel import Datamodel
282 currentObj = deepcopy(currentObj)
283 ev: Event
284 for ev in previousEvents:
285 if ev is None:
286 continue
287 elif ev.eventtype == "added":
288 currentObj = Datamodel.createDataobject(
289 datasource.schema, ev.objtype, ev.objattrs
290 )
291 elif ev.eventtype == "modified":
292 if currentObj is None:
293 # Should never occur
294 errmsg = (
295 "BUG : unexpected object status met when trying"
296 f" to merge two events {lastEvent=}"
297 f" {lastEvent.eventtype=} ; {prevEvent=}"
298 f" {prevEvent.eventtype=}"
299 )
300 __hermes__.logger.critical(errmsg)
301 raise AssertionError(errmsg)
302 currentObj = Datamodel.getUpdatedObject(
303 currentObj, ev.objattrs
304 )
305 elif ev.eventtype == "removed":
306 # Should never occur
307 currentObj = None
309 if currentObj is None or newObj is None:
310 __hermes__.logger.warning(
311 f"BUG ? - Unable to merge removed {prevEvent=} with added "
312 f"{lastEvent.objattrs=}, as related object was not found "
313 f"in caches. {currentObj=} {newObj=}"
314 )
315 else:
316 # Diff the desired object with current one to generate a
317 # modified event
318 mergedEvent, _ = Event.fromDiffItem(
319 newObj.diffFrom(currentObj), "base", "modified"
320 )
322 if (
323 mergedEvent.objattrs["added"]
324 or mergedEvent.objattrs["modified"]
325 or mergedEvent.objattrs["removed"]
326 ):
327 __hermes__.logger.info(
328 f"Merging removed {prevEvent=} with added"
329 f" {lastEvent.objattrs=}, result is modified"
330 f" {mergedEvent=} {mergedEvent.objattrs=}"
331 )
332 return (True, False, mergedEvent)
333 else:
334 __hermes__.logger.info(
335 f"Merging removed {prevEvent=} with added "
336 f"{lastEvent.objattrs=}, result is an empty modified"
337 " event (without any change). Ignoring it"
338 )
339 return (True, True, mergedEvent)
341 # Use "conservative" as fallback : don't merge the events
342 return (False, False, None)
343 elif prevEvent.eventtype == "modified" and lastEvent.eventtype == "modified":
344 # Merge the two modified events
345 mergedEvent = deepcopy(prevEvent)
346 objattrs = mergedEvent.objattrs
348 lastAdded: dict[str, Any] = lastEvent.objattrs.get("added", dict())
349 lastModified: dict[str, Any] = lastEvent.objattrs.get("modified", dict())
350 lastRemoved: dict[str, Any] = lastEvent.objattrs.get("removed", dict())
352 # Newly added attributes
353 for attr, newval in lastAdded.items():
354 # # Newly added should not exist in prev modified
355 # if attr in objattrs["modified"]:
356 # raise AssertionError
357 # Merge prev added with last added
358 objattrs["added"][attr] = newval
359 # As attr is now added, it should not be removed anymore
360 if attr in objattrs["removed"]:
361 del objattrs["removed"][attr]
363 # Newly modified attributes
364 for attr, newval in lastModified.items():
365 # # Newly modified should not exist in prev removed
366 # if attr in objattrs["removed"]:
367 # raise AssertionError
368 # If was added in prev, keep it as added, but update its value
369 if attr in objattrs["added"]:
370 objattrs["added"][attr] = newval
371 else:
372 # Keep or update the last modified value
373 objattrs["modified"][attr] = newval
375 # Newly removed attributes
376 for attr, newval in lastRemoved.items():
377 # # Newly removed should not exist in prev removed
378 # if attr in objattrs["removed"]:
379 # raise AssertionError
380 if attr in objattrs["added"]:
381 # Added + removed : nothing to keep
382 del objattrs["added"][attr]
383 else:
384 if attr in objattrs["modified"]:
385 # Modified + removed : don't keep the modified
386 del objattrs["modified"][attr]
387 # Keep the removed
388 objattrs["removed"][attr] = newval
390 __hermes__.logger.info(
391 f"Merging modified {prevEvent.objattrs=} with modified"
392 f" {lastEvent.objattrs=}, result is modified {objattrs=}"
393 )
394 return (True, False, mergedEvent)
395 elif prevEvent.eventtype == "modified" and lastEvent.eventtype == "removed":
396 if self._autoremediate == "maximum":
397 # Remove prevEvent
398 __hermes__.logger.info(
399 f"Merging modified {prevEvent.objattrs=} with removed"
400 f" {lastEvent.objattrs=}, result is removed {lastEvent=}"
401 )
402 return (True, False, lastEvent)
403 # Use "conservative" as fallback : don't merge the events
404 return (False, False, None)
405 else:
406 errmsg = (
407 "BUG : unexpected eventtype met when trying to merge two events "
408 f"{lastEvent=} {lastEvent.eventtype=}"
409 f" ; {prevEvent=} {prevEvent.eventtype=}"
410 )
411 __hermes__.logger.critical(errmsg)
412 raise AssertionError(errmsg)
414 def _remediateWithPrevious(self, eventNumber: int):
415 lastEventNumber = eventNumber
416 (lastRemoteEvent, lastLocalEvent, lastErrorMsg) = self._queue[lastEventNumber]
418 allEventNumbers = sorted(
419 self._index[lastLocalEvent.objtype][lastLocalEvent.objpkey]
420 )
421 allEvents = [self._queue[evNum] for evNum in allEventNumbers]
422 if len(allEvents) < 2:
423 # No previous event to remediate with
424 return
426 previousRemoteEvents = [i[0] for i in allEvents[:-2]]
427 previousLocalEvents = [i[1] for i in allEvents[:-2]]
429 prevEventNumber = allEventNumbers[-2]
430 (prevRemoteEvent, prevLocalEvent, prevErrorMsg) = allEvents[-2]
432 # Can't merge partially processed events
433 if (
434 prevLocalEvent.isPartiallyProcessed
435 or lastLocalEvent.isPartiallyProcessed
436 or (prevRemoteEvent is not None and prevRemoteEvent.isPartiallyProcessed)
437 or (lastRemoteEvent is not None and lastRemoteEvent.isPartiallyProcessed)
438 ):
439 stepsvalues = []
440 if prevRemoteEvent is not None:
441 stepsvalues.append(f"{prevRemoteEvent.isPartiallyProcessed=}")
442 else:
443 stepsvalues.append(f"{prevRemoteEvent=}")
444 stepsvalues.append(f"{prevLocalEvent.isPartiallyProcessed=}")
445 if lastRemoteEvent is not None:
446 stepsvalues.append(f"{lastRemoteEvent.isPartiallyProcessed=}")
447 else:
448 stepsvalues.append(f"{lastRemoteEvent=}")
449 stepsvalues.append(f"{lastLocalEvent.isPartiallyProcessed=}")
451 __hermes__.logger.info(
452 "Unable to merge two events of which at least one has already been"
453 f" partially processed. {' '.join(stepsvalues)}"
454 )
455 return
457 (remotedWasMerged, remoteRemoveBothEvents, newRemoteEvent) = self._mergeEvents(
458 prevRemoteEvent,
459 lastRemoteEvent,
460 self._remotedata,
461 self._remotedata_complete,
462 previousRemoteEvents,
463 )
464 (localWasMerged, localRemoveBothEvents, newLocalEvent) = self._mergeEvents(
465 prevLocalEvent,
466 lastLocalEvent,
467 self._localdata,
468 self._localdata_complete,
469 previousLocalEvents,
470 )
472 if remotedWasMerged != localWasMerged:
473 errmsg = (
474 "BUG : inconsistency between remote and local merge results :"
475 f" {remotedWasMerged=}, {localWasMerged=}. {prevEventNumber=},"
476 f" {lastEventNumber=} {prevRemoteEvent.toString(set())=}"
477 f" {lastRemoteEvent.toString(set())=},"
478 f" {prevLocalEvent.toString(set())=}, {lastLocalEvent.toString(set())=}"
479 )
480 __hermes__.logger.critical(errmsg)
481 raise AssertionError(errmsg)
483 if not localWasMerged:
484 # No merge was done
485 return
487 # Local processing result is the only one that really matters here
488 if localRemoveBothEvents:
489 self.remove(lastEventNumber)
490 self.remove(prevEventNumber)
491 else:
492 # Last event was merged into previous, update previous and remove last from
493 # queue
494 self._queue[prevEventNumber] = (newRemoteEvent, newLocalEvent, prevErrorMsg)
495 self.remove(lastEventNumber)
497 def _addEventToIndex(self, eventNumber: int):
498 """Add specified event to index"""
499 if eventNumber not in self._queue:
500 raise IndexError(f"Specified {eventNumber=} doesn't exist in queue")
502 remoteEvent: Event | None
503 localEvent: Event
504 remoteEvent, localEvent, errorMsg = self._queue[eventNumber]
506 objtype = localEvent.objtype
508 # Create objtype sublevel if it doesn't exist yet
509 if objtype not in self._index:
510 self._index[objtype] = {}
512 if localEvent.objpkey not in self._index[objtype]:
513 # Create the set with specified eventNumber
514 self._index[objtype][localEvent.objpkey] = set([eventNumber])
515 else:
516 # Add specified eventNumber to the set
517 self._index[objtype][localEvent.objpkey].add(eventNumber)
519 def _addParentObjs(self, eventNumber: int):
520 """Index parent objects of specified event"""
521 if self._localdata is None:
522 # Unable to proceed without a datasource
523 return
525 if eventNumber not in self._queue:
526 raise IndexError(f"Specified {eventNumber=} doesn't exist in queue")
528 remoteEvent: Event | None
529 localEvent: Event
530 remoteEvent, localEvent, errorMsg = self._queue[eventNumber]
532 fromobj: DataObject | None = self._localdata.get(localEvent.objtype, {}).get(
533 localEvent.objpkey, None
534 )
535 if fromobj is None:
536 fromobj = self._localdata_complete.get(localEvent.objtype, {}).get(
537 localEvent.objpkey, None
538 )
539 if fromobj is None:
540 return
542 parents = ForeignKey.fetchParentObjs(self._localdata, fromobj)
544 for parent in parents:
545 objtype = parent.getType()
546 # Create objtype sublevel if it doesn't exist yet
547 if objtype not in self._parentObjs:
548 self._parentObjs[objtype] = {}
550 if parent.getPKey() not in self._parentObjs[objtype]:
551 # Create the set with specified eventNumber
552 self._parentObjs[objtype][parent.getPKey()] = set([eventNumber])
553 else:
554 # Add specified eventNumber to the set
555 self._parentObjs[objtype][parent.getPKey()].add(eventNumber)
557 def updateErrorMsg(self, eventNumber: int, errorMsg: str):
558 """Update errorMsg of specified eventNumber"""
559 if eventNumber not in self._queue:
560 raise IndexError(f"Specified {eventNumber=} doesn't exist in queue")
562 remoteEvent: Event | None
563 localEvent: Event
564 remoteEvent, localEvent, oldErrorMsg = self._queue[eventNumber]
565 self._queue[eventNumber] = (remoteEvent, localEvent, errorMsg)
567 def remove(self, eventNumber: int, ignoreMissingEventNumber=False):
568 """Remove event of specified eventNumber from queue"""
569 if eventNumber not in self._queue:
570 if ignoreMissingEventNumber:
571 return
572 else:
573 raise IndexError(f"Specified {eventNumber=} doesn't exist in queue")
575 remoteEvent: Event | None
576 localEvent: Event
577 remoteEvent, localEvent, errorMsg = self._queue[eventNumber]
579 del self._queue[eventNumber] # Remove data from queue
581 objtype = localEvent.objtype
583 # Remove from index
584 self._index[objtype][localEvent.objpkey].remove(eventNumber)
586 # Purge index uplevels when empty
587 if not self._index[objtype][localEvent.objpkey]:
588 del self._index[objtype][localEvent.objpkey]
590 if not self._index[objtype]:
591 del self._index[objtype]
593 # Remove all references of specified eventNumber from _parentObjs
594 for objtype in tuple(self._parentObjs.keys()):
595 for objpkey in tuple(self._parentObjs[objtype].keys()):
596 # Remove from parentObjs
597 self._parentObjs[objtype][objpkey].discard(eventNumber)
599 # Purge index uplevels when empty
600 if not self._parentObjs[objtype][objpkey]:
601 del self._parentObjs[objtype][objpkey]
603 if not self._parentObjs[objtype]:
604 del self._parentObjs[objtype]
606 def __iter__(self) -> Iterable:
607 """Returns an iterator of current instance events to process
608 It will ignore events about an object that still have older event in queue
610 Each entry will contains 4 values:
611 1. eventNumber: int
612 2. remoteEvent: remote event, or None if the entry was generated by a
613 "pure" local event (eg. a change in the client datamodel)
614 3. localEvent: local event
615 4. errorMsg: str or None
616 """
617 eventNumber: int
618 remoteEvent: Event | None
619 localEvent: Event
620 errorMsg: str | None
622 for eventNumber in list(self._queue.keys()):
623 # Event may have been removed during iteration
624 # e.g. a call to purgeAllEventsOfDataObject() may remove several events
625 if eventNumber not in self._queue:
626 continue
628 remoteEvent, localEvent, errorMsg = self._queue[eventNumber]
629 objindex = self._index[localEvent.objtype][localEvent.objpkey]
631 # If current event isn't the first of its object index, ignore it
632 # because the previous must be processed before
633 if eventNumber != min(objindex):
634 continue
636 yield (eventNumber, remoteEvent, localEvent, errorMsg)
638 def allEvents(self) -> Iterable:
639 """Returns an iterator of all current instance events
641 Each entry will contains 4 values:
642 1. eventNumber: int
643 2. remoteEvent: remote event, or None if the entry was generated by a
644 "pure" local event (eg. a change in the client datamodel)
645 3. localEvent: local event
646 4. errorMsg: str or None
647 """
648 eventNumber: int
649 remoteEvent: Event | None
650 localEvent: Event
651 errorMsg: str | None
653 for eventNumber in list(self._queue.keys()):
654 # Event may have been removed during iteration
655 # e.g. a call to purgeAllEventsOfDataObject() may remove several events
656 if eventNumber not in self._queue:
657 continue
659 remoteEvent, localEvent, errorMsg = self._queue[eventNumber]
660 yield (eventNumber, remoteEvent, localEvent, errorMsg)
662 def __len__(self) -> int:
663 """Returns the number of Event in queue"""
664 return len(self._queue)
666 def keys(self) -> set[int]:
667 """Returns a set with every event number in current instance"""
668 return set(self._queue.keys())
670 def _getLocalObjtype(self, objtype: str, isLocalObjtype: bool) -> str | None:
671 """Returns the local objtype, or None if it doesn't exist"""
672 if isLocalObjtype:
673 if objtype in self._typesMapping["local"]:
674 return objtype
675 else:
676 return None
677 else:
678 return self._typesMapping["remote"].get(objtype)
680 def containsObject(self, objtype: str, objpkey: Any, isLocalObjtype: bool) -> bool:
681 """Indicate if object of specified objpkey of specified objtype exists in
682 current instance"""
683 l_objtype = self._getLocalObjtype(objtype, isLocalObjtype)
684 if l_objtype is None:
685 return False
686 return l_objtype in self._index and objpkey in self._index[l_objtype]
688 def containsObjectByEvent(self, event: Event, isLocalEvent: bool) -> bool:
689 """Indicate if object of specified event exists in current instance"""
690 return self.containsObject(event.objtype, event.objpkey, isLocalEvent)
692 def isEventAParentOfAnotherError(self, event: Event, isLocalEvent: bool) -> bool:
693 """Indicate if object of specified event is a parent (a foreign key) of
694 an object that exists in current instance"""
695 l_objtype = self._getLocalObjtype(event.objtype, isLocalEvent)
696 if l_objtype is None:
697 return False
698 parentEvs = self._parentObjs.get(l_objtype, {}).get(event.objpkey)
699 return parentEvs is not None
701 def purgeAllEvents(self, objtype: str, objpkey: Any, isLocalObjtype: bool):
702 """Delete all events of specified objpkey of specified objtype from current
703 instance"""
704 if not self.containsObject(objtype, objpkey, isLocalObjtype):
705 return
707 l_objtype = self._getLocalObjtype(objtype, isLocalObjtype)
708 if l_objtype is None:
709 return
711 eventNumbers = self._index[l_objtype][objpkey]
713 # Loop over a copy as content may be removed during iteration
714 for eventNumber in eventNumbers.copy():
715 self.remove(eventNumber)
717 def purgeAllEventsOfDataObject(self, obj: DataObject, isLocalObjtype: bool):
718 """Delete all events of specified objpkey of specified objtype from current
719 instance"""
720 self.purgeAllEvents(obj.getType(), obj.getPKey(), isLocalObjtype)
722 def updatePrimaryKeys(
723 self,
724 new_remote_pkeys: dict[str, str],
725 remote_data: Datasource,
726 remote_data_complete: Datasource,
727 new_local_pkeys: dict[str, str],
728 local_data: Datasource,
729 local_data_complete: Datasource,
730 ):
731 """Will update primary keys. new_remote_pkeys is a dict with remote objtype as
732 key, and the new remote primary key attribute name as value. new_local_pkeys is
733 a dict with local objtype as key, and the new local primary key attribute name
734 as value.
735 The specified datasources must not have their primary keys updated yet, to
736 allow conversion.
738 The ErrorQueue MUST immediately be saved and re-instantiated from cache by
739 caller to reflect data changes.
740 """
741 newqueue = {}
742 for eventNumber, (remoteEvent, localEvent, errorMsg) in self._queue.items():
743 # Remote event, if any
744 if remoteEvent is None:
745 newRemoteEvent = None
746 else:
747 if remoteEvent.objtype not in new_remote_pkeys.keys():
748 # Objtype of remote event has no pkey update
749 newRemoteEvent = remoteEvent
750 else:
751 oldobj = remote_data[remoteEvent.objtype].get(remoteEvent.objpkey)
752 if oldobj is None:
753 oldobj = remote_data_complete[remoteEvent.objtype][
754 remoteEvent.objpkey
755 ]
756 if type(new_remote_pkeys[remoteEvent.objtype]) is tuple:
757 # New pkey is a tuple, loop over each attr
758 newpkey = []
759 for pkattr in new_remote_pkeys[remoteEvent.objtype]:
760 newpkey.append(getattr(oldobj, pkattr))
761 newpkey = tuple(newpkey)
762 else:
763 newpkey = getattr(oldobj, new_remote_pkeys[remoteEvent.objtype])
765 # Save modified remote event
766 newRemoteEvent = deepcopy(remoteEvent)
767 newRemoteEvent.objpkey = newpkey
769 # Local event
770 if localEvent.objtype not in new_local_pkeys.keys():
771 # Objtype of local event has no pkey update
772 newLocalEvent = localEvent
773 else:
774 oldobj = local_data[localEvent.objtype].get(localEvent.objpkey)
775 if oldobj is None:
776 oldobj = local_data_complete[localEvent.objtype][localEvent.objpkey]
777 if type(new_local_pkeys[localEvent.objtype]) is tuple:
778 # New pkey is a tuple, loop over each attr
779 newpkey = []
780 for pkattr in new_local_pkeys[localEvent.objtype]:
781 newpkey.append(getattr(oldobj, pkattr))
782 newpkey = tuple(newpkey)
783 else:
784 newpkey = getattr(oldobj, new_local_pkeys[localEvent.objtype])
786 # Save modified local event
787 newLocalEvent = deepcopy(localEvent)
788 newLocalEvent.objpkey = newpkey
790 # Update reserved _pkey* attributes in added events
791 if newLocalEvent.eventtype == "added":
792 # Remove previous pkey attributes
793 for attr in list(newLocalEvent.objattrs.keys()):
794 if attr.startswith("_pkey_"):
795 del newLocalEvent.objattrs[attr]
796 # Add new pkey attributes
797 if type(new_local_pkeys[localEvent.objtype]) is tuple:
798 for i in range(len(newpkey)):
799 newLocalEvent.objattrs[
800 new_local_pkeys[localEvent.objtype][i]
801 ] = newpkey[i]
802 else:
803 newLocalEvent.objattrs[new_local_pkeys[localEvent.objtype]] = (
804 newpkey
805 )
807 # Save modified entry
808 newqueue[eventNumber] = (newRemoteEvent, newLocalEvent, errorMsg)
810 self._queue = newqueue