Coverage for lib / datamodel / dataobjectlist.py: 99%
154 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-16 15:10 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-16 15:10 +0000
1#!/usr/bin/env python3
2# -*- coding: utf-8 -*-
4# Hermes : Change Data Capture (CDC) tool from any source(s) to any target
5# Copyright (C) 2023 INSA Strasbourg
6#
7# This file is part of Hermes.
8#
9# Hermes is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# Hermes is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with Hermes. If not, see <https://www.gnu.org/licenses/>.
23from typing import TypeVar, Any, Iterable
25import time
27from lib.datamodel.diffobject import DiffObject
28from lib.datamodel.dataobject import DataObject, HermesMergingConflictError
29from lib.datamodel.foreignkey import ForeignKey
30from lib.datamodel.serialization import LocalCache
32AnyDataObjectList = TypeVar("AnyDataObjectList", bound="DataObjectList")
35class DataObjectList(LocalCache):
36 """Generic serializable list of DataObject
38 Subclasses must define the following class vars:
39 - OBJTYPE: data type contained in list, mandatory for deserialization
41 The class provides
42 - data storage
43 - index creation for DataObject.PRIMARYKEY_ATTRIBUTE.
44 Indexed data is accessible by self[pkeyvalue].
45 - inconsistencices (duplicates) detection and replacement by cache values
46 - mergeConflicts detection and replacement by cache values
47 - merge_constraints and integrity_constraints filtered items storage with respective
48 attributes mergeFiltered and integrityFiltered
49 - json serialization/deserialization
50 - diffFrom() function generating DiffFrom object
51 """
53 OBJTYPE: "type[DataObject]" = DataObject
54 """Object type stored by current class"""
56 FOREIGNKEYS: list[ForeignKey] = []
57 """Foreign keys of current OBJTYPE"""
59 def __init__(
60 self,
61 objlist: list[DataObject] | None = None,
62 from_json_dict: list[dict[str, Any]] | None = None,
63 ):
64 """Create a new instance, with data coming from json (for deserialization),
65 or from specified list of DataObject.
67 If data is from json, every objects will be instantiated by deserialization.
68 If data is from objlist, every object of another type that self.OBJTYPE will be
69 casted to OBJTYPE
70 """
71 super().__init__(jsondataattr="_data")
73 self._inconsistencies: set[Any] = set()
74 """Set containing primary keys of all duplicated entries"""
76 self._mergeConflicts: set[Any] = set()
77 """Set containing primary keys of each entry with a merge conflict (i.e. when
78 the same attribute has different values on different sources)"""
80 self.mergeFiltered: set[Any] = set()
81 """Set containing primary keys of each entry filtered by merge constraints"""
83 self.integrityFiltered: set[Any] = set()
84 """Set containing primary keys of each entry filtered by integrity
85 constraints"""
87 self._datadict: dict[Any, DataObject] = {}
88 """Dictionary containing the data, with primary keys as keys, and DataObject as
89 values"""
91 if objlist is None and from_json_dict is None:
92 err = (
93 "Cannot instantiate object from nothing: you must specify one data"
94 " source"
95 )
96 __hermes__.logger.critical(err)
97 raise AttributeError(err)
99 if objlist is not None and from_json_dict is not None:
100 err = "Cannot instantiate object from multiple data sources at once"
101 __hermes__.logger.critical(err)
102 raise AttributeError(err)
104 if objlist is not None:
105 self.__init_from_objlist__(objlist)
106 elif from_json_dict is not None:
107 self.__init_from_json_dict__(from_json_dict)
109 def __init_from_json_dict__(self, from_json_dict: list[dict[str, Any]]):
110 """Create a new instance, with data coming from json.
111 Every objects in list will be instantiated by deserialization."""
112 self.__init_from_objlist__(
113 [self.OBJTYPE(from_json_dict=item) for item in from_json_dict]
114 )
116 def __init_from_objlist__(self, objlist: list[DataObject]):
117 """Create a new instance, with data from specified list of DataObject.
118 Every object of another type that self.OBJTYPE will be casted to OBJTYPE.
119 """
120 for obj in objlist:
121 self.append(obj)
123 @property
124 def _data(self) -> list[DataObject]:
125 """Returns a list of current DataObject values"""
126 return [self._datadict[k] for k in sorted(self._datadict.keys())]
128 def __len__(self) -> int:
129 """Returns the number of items in current instance"""
130 return len(self._datadict)
132 def __iter__(self) -> Iterable:
133 """Returns an iterator of current DataObject values"""
134 return iter(self._datadict.values())
136 def __getitem__(self, objOrPkey: Any) -> DataObject:
137 """Indexer operator '[]' returning DataObject entry with same pkey as specified
138 one, or as specified DataObject"""
139 if isinstance(objOrPkey, DataObject):
140 return self._datadict[objOrPkey.getPKey()]
141 else:
142 return self._datadict[objOrPkey]
144 def __contains__(self, objOrPkey: Any) -> bool:
145 """'in' operator: return True if specified DataObject or pkey exists in current
146 instance"""
147 if isinstance(objOrPkey, DataObject):
148 return objOrPkey.getPKey() in self._datadict
149 else:
150 return objOrPkey in self._datadict
152 def get(self, pkey: Any, __default: Any = None) -> Any:
153 """Returns DataObject entry with specified pkey, or __default value if no entry
154 was found"""
155 return self._datadict.get(pkey, __default)
157 def getPKeys(self) -> set[Any]:
158 """Returns a set of each primary key of current DataObject values"""
159 return set(self._datadict.keys())
161 def append(self, obj: DataObject, ignoreIfAlreadyPresent: bool = False):
162 """Append specified object to current instance.
163 If obj is of another type than self.OBJTYPE, it will be casted to OBJTYPE.
164 If obj is already in current instance, it will be put in _inconsistencies
165 """
166 if type(obj) is self.OBJTYPE:
167 objconverted = obj
168 else:
169 # Recreate object with the required type
170 # (useful when merging data from datamodel)
171 objconverted = self.OBJTYPE(from_json_dict=obj.toNative())
173 pkey = objconverted.getPKey()
174 if (
175 not ignoreIfAlreadyPresent
176 and pkey in self._inconsistencies | self._mergeConflicts
177 ):
178 # __hermes__.logger.debug(
179 # f"<{self.__class__.__name__}> Ignoring {objconverted=}"
180 # " because already known as an inconsistency"
181 # )
182 return
184 if pkey not in self._datadict:
185 self._datadict[pkey] = objconverted
186 elif not ignoreIfAlreadyPresent:
187 __hermes__.logger.warning(
188 f"<{self.__class__.__name__}> Trying to insert an already existing"
189 f" object: {objconverted=}"
190 )
191 self._inconsistencies.add(pkey)
192 del self._datadict[pkey]
194 def replace(self, obj: DataObject):
195 """Replace specified DataObject (i.e. with same pkey, but different values) in
196 current instance"""
197 pkey = obj.getPKey()
198 if pkey not in self._datadict:
199 raise IndexError(
200 f"Cannot replace object with pkey {pkey} as previous doesn't exist"
201 )
202 self._datadict[pkey] = obj
204 def removeByPkey(self, pkey: Any):
205 """Remove DataObject corresponding to specified pkey from current instance"""
206 if pkey in self._datadict:
207 del self._datadict[pkey]
209 def remove(self, obj: DataObject):
210 """Remove specified DataObject from current instance"""
211 self.removeByPkey(obj.getPKey())
213 def toNative(self) -> list[dict[str, Any]]:
214 """Return a list of complete data dict of current DataObject values"""
215 return [item.toNative() for item in self._datadict.values()]
217 def mergeWith(
218 self,
219 objlist: list[DataObject],
220 pkeyMergeConstraint: str,
221 dontMergeOnConflict=False,
222 ) -> set[Any]:
223 """Merge specified objlist data in current
224 If dontMergeOnConflict is True, pkeys of items with conflict will be put in
225 mergeConflicts and items will be removed of current list. Otherwise conflicting
226 data of item in current instance will be kept
227 Returns a set containing pkeys of items filtered by pkeyMergeConstraint
228 """
230 validsPkeyMergeConstraints = (
231 "noConstraint",
232 "mustNotExist",
233 "mustAlreadyExist",
234 "mustExistInBoth",
235 )
237 if pkeyMergeConstraint not in validsPkeyMergeConstraints:
238 raise AttributeError(
239 f"Specified {pkeyMergeConstraint=} is invalid."
240 f" Valiid values are {validsPkeyMergeConstraints}"
241 )
243 pkeysMerged = set()
244 pkeysToRemove = set()
245 pkeysIgnored = set()
247 for obj in objlist:
248 pkey = obj.getPKey()
249 if pkey not in self.getPKeys():
250 if pkeyMergeConstraint in ("noConstraint", "mustNotExist"):
251 # Constraint is respected, add object
252 pkeysMerged.add(pkey)
253 self.append(obj)
254 elif pkeyMergeConstraint in ("mustAlreadyExist", "mustExistInBoth"):
255 # Constraint isn't respected, don't merge object, nothing else to do
256 pkeysIgnored.add(pkey)
257 else:
258 if pkeyMergeConstraint in (
259 "noConstraint",
260 "mustAlreadyExist",
261 "mustExistInBoth",
262 ):
263 # Constraint is respected, merge object
264 pkeysMerged.add(pkey)
265 newobj = self[pkey]
266 try:
267 newobj.mergeWith(obj, dontMergeOnConflict)
268 except HermesMergingConflictError:
269 self._mergeConflicts.add(pkey)
270 self.removeByPkey(pkey)
271 else:
272 # newobj may be a new instance, so overwrite current reference
273 # in datadict
274 self.replace(newobj)
275 elif pkeyMergeConstraint == "mustNotExist":
276 # Constraint isn't respected, remove object
277 pkeysToRemove.add(pkey)
279 if pkeyMergeConstraint == "mustExistInBoth":
280 pkeysToRemove |= self.getPKeys() - pkeysMerged
282 if pkeysToRemove:
283 for pkey in pkeysToRemove:
284 self.removeByPkey(pkey)
286 __hermes__.logger.debug(
287 f"pkey_merge_constraints: merged {len(pkeysMerged)} objects, ignored"
288 f" {len(pkeysIgnored)} objects, removed {len(pkeysToRemove)} objects from"
289 f" {type(self)}"
290 )
292 return pkeysIgnored | pkeysToRemove
294 def diffFrom(self, other: AnyDataObjectList) -> DiffObject:
295 """Returns a DiffObject containing differences between current instance and
296 specified 'other', assuming current is the newest"""
297 starttime = time.time()
298 diff = DiffObject()
300 s = self.getPKeys()
301 o = other.getPKeys()
302 commonattrs = s & o
304 diff.appendRemoved([other[pkey] for pkey in (o - s)])
305 diff.appendAdded([self[pkey] for pkey in (s - o)])
307 for pkey, obj in self._datadict.items():
308 if pkey in commonattrs:
309 diffobj = obj.diffFrom(other[pkey])
310 if diffobj:
311 diff.appendModified(diffobj)
313 elapsedtime = time.time() - starttime
314 elapsed = int(round(1000 * elapsedtime))
316 diffcount = [f"{len(v)} {k}" for k, v in diff.dict.items() if len(v) > 0]
317 info = ", ".join(diffcount) if diffcount else "no difference"
318 __hermes__.logger.debug(
319 f"{self.__class__.__name__}: Diffed {len(s)}/{len(o)} entries in"
320 f" {elapsed} ms: {info}"
321 )
322 return diff
324 @property
325 def inconsistencies(self) -> set[Any]:
326 """Returns a set containing primary keys of all duplicated entries
328 Warning: only indicate duplicated entries of first declared source in current
329 type, duplicated entries of other sources will be notified in mergeConflicts"""
330 return self._inconsistencies.copy()
332 def replaceInconsistenciesByCachedValues(self, cache: AnyDataObjectList):
333 """Replace each entry filtered for inconsistency by their cache value, when
334 existing"""
335 for src, srcname in [
336 (self._inconsistencies, "inconsistency"),
337 (self._mergeConflicts, "merge conflict"),
338 ]:
339 for pkey in src:
340 if pkey in cache.getPKeys():
341 self._datadict[pkey] = cache[pkey]
342 __hermes__.logger.warning(
343 f"Entry of pkey {pkey} with {srcname} found in cache,"
344 " using cache value"
345 )
346 else:
347 # Data shouldn't contains an entry with this pkey anymore,
348 # nothing to do
349 __hermes__.logger.warning(
350 f"Entry of pkey {pkey} with {srcname} not found in cache,"
351 " ignoring it"
352 )
354 @property
355 def mergeConflicts(self) -> set[Any]:
356 """Returns a set containing primary keys of each entry with a merge conflict
357 (i.e. when the same attribute has different values on different sources)"""
358 return self._mergeConflicts.copy()
360 def extend(self, other: AnyDataObjectList):
361 """Extend current DataObjectList content with the specified other's content,
362 by reference.
364 The primary keys of "other" must not exist in current instance, otherwise
365 an KeyError exception will be raised.
366 """
367 if not self._datadict.keys().isdisjoint(other._datadict.keys()):
368 raise KeyError(
369 "Unable to extend, as current instance and 'other' contains some"
370 " objects with the same primary key"
371 )
373 self._datadict |= other._datadict