Coverage for lib/datamodel/dataobjectlist.py: 99%
153 statements
« prev ^ index » next coverage.py v7.10.1, created at 2025-07-28 07:24 +0000
« prev ^ index » next coverage.py v7.10.1, created at 2025-07-28 07:24 +0000
1#!/usr/bin/env python3
2# -*- coding: utf-8 -*-
4# Hermes : Change Data Capture (CDC) tool from any source(s) to any target
5# Copyright (C) 2023, 2024 INSA Strasbourg
6#
7# This file is part of Hermes.
8#
9# Hermes is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# Hermes is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with Hermes. If not, see <https://www.gnu.org/licenses/>.
23from typing import TypeVar, Any, Iterable
25import time
27from lib.datamodel.diffobject import DiffObject
28from lib.datamodel.dataobject import DataObject, HermesMergingConflictError
29from lib.datamodel.foreignkey import ForeignKey
30from lib.datamodel.serialization import LocalCache
32AnyDataObjectList = TypeVar("AnyDataObjectList", bound="DataObjectList")
35class DataObjectList(LocalCache):
36 """Generic serializable list of DataObject
38 Subclasses must define the following class vars:
39 - OBJTYPE: data type contained in list, mandatory for deserialization
41 The class provides
42 - data storage
43 - index creation for DataObject.PRIMARYKEY_ATTRIBUTE.
44 Indexed data is accessible by self[pkeyvalue].
45 - inconsistencices (duplicates) detection and replacement by cache values
46 - mergeConflicts detection and replacement by cache values
47 - merge_constraints and integrity_constraints filtered items storage with respective
48 attributes mergeFiltered and integrityFiltered
49 - json serialization/deserialization
50 - diffFrom() function generating DiffFrom object
51 """
53 OBJTYPE: "type[DataObject]" = DataObject
54 """Object type stored by current class"""
56 FOREIGNKEYS: list[ForeignKey] = []
57 """Foreign keys of current OBJTYPE"""
59 def __init__(
60 self,
61 objlist: list[DataObject] | None = None,
62 from_json_dict: list[dict[str, Any]] | None = None,
63 ):
64 """Create a new instance, with data coming from json (for deserialization),
65 or from specified list of DataObject.
67 If data is from json, every objects will be instantiated by deserialization.
68 If data is from objlist, every object of another type that self.OBJTYPE will be
69 casted to OBJTYPE
70 """
71 super().__init__(jsondataattr="_data")
73 self._inconsistencies: set[Any] = set()
74 """Set containing primary keys of all duplicated entries"""
76 self._mergeConflicts: set[Any] = set()
77 """Set containing primary keys of each entry with a merge conflict (i.e. when
78 the same attribute has different values on different sources)"""
80 self.mergeFiltered: set[Any] = set()
81 """Set containing primary keys of each entry filtered by merge constraints"""
83 self.integrityFiltered: set[Any] = set()
84 """Set containing primary keys of each entry filtered by integrity
85 constraints"""
87 self._datadict: dict[Any, DataObject] = {}
88 """Dictionary containing the data, with primary keys as keys, and DataObject as
89 values"""
91 if objlist is None and from_json_dict is None:
92 err = (
93 "Cannot instantiate object from nothing: you must specify one data"
94 " source"
95 )
96 __hermes__.logger.critical(err)
97 raise AttributeError(err)
99 if objlist is not None and from_json_dict is not None:
100 err = "Cannot instantiate object from multiple data sources at once"
101 __hermes__.logger.critical(err)
102 raise AttributeError(err)
104 if objlist is not None:
105 self.__init_from_objlist__(objlist)
106 elif from_json_dict is not None:
107 self.__init_from_json_dict__(from_json_dict)
109 def __init_from_json_dict__(self, from_json_dict: list[dict[str, Any]]):
110 """Create a new instance, with data coming from json.
111 Every objects in list will be instantiated by deserialization."""
112 self.__init_from_objlist__(
113 [self.OBJTYPE(from_json_dict=item) for item in from_json_dict]
114 )
116 def __init_from_objlist__(self, objlist: list[DataObject]):
117 """Create a new instance, with data from specified list of DataObject.
118 Every object of another type that self.OBJTYPE will be casted to OBJTYPE.
119 """
120 for obj in objlist:
121 self.append(obj)
123 @property
124 def _data(self) -> list[DataObject]:
125 """Returns a list of current DataObject values"""
126 return [self._datadict[k] for k in sorted(self._datadict.keys())]
128 def __len__(self) -> int:
129 """Returns the number of items in current instance"""
130 return len(self._datadict)
132 def __iter__(self) -> Iterable:
133 """Returns an iterator of current DataObject values"""
134 return iter(self._datadict.values())
136 def __getitem__(self, objOrPkey: Any) -> DataObject:
137 """Indexer operator '[]' returning DataObject entry with same pkey as specified
138 one, or as specified DataObject"""
139 if isinstance(objOrPkey, DataObject):
140 return self._datadict[objOrPkey.getPKey()]
141 else:
142 return self._datadict[objOrPkey]
144 def __contains__(self, objOrPkey: Any) -> bool:
145 """'in' operator: return True if specified DataObject or pkey exists in current
146 instance"""
147 if isinstance(objOrPkey, DataObject):
148 return objOrPkey.getPKey() in self._datadict
149 else:
150 return objOrPkey in self._datadict
152 def get(self, pkey: Any, __default: Any = None) -> Any:
153 """Returns DataObject entry with specified pkey, or __default value if no entry
154 was found"""
155 return self._datadict.get(pkey, __default)
157 def getPKeys(self) -> set[Any]:
158 """Returns a set of each primary key of current DataObject values"""
159 return set(self._datadict.keys())
161 def append(self, obj: DataObject):
162 """Append specified object to current instance.
163 If obj is of another type than self.OBJTYPE, it will be casted to OBJTYPE.
164 If obj is already in current instance, it will be put in _inconsistencies
165 """
166 if type(obj) is self.OBJTYPE:
167 objconverted = obj
168 else:
169 # Recreate object with the required type
170 # (useful when merging data from datamodel)
171 objconverted = self.OBJTYPE(from_json_dict=obj.toNative())
173 pkey = objconverted.getPKey()
174 if pkey in self._inconsistencies | self._mergeConflicts:
175 # __hermes__.logger.debug(
176 # f"<{self.__class__.__name__}> Ignoring {objconverted=}"
177 # " because already known as an inconsistency"
178 # )
179 return
181 if pkey not in self._datadict:
182 self._datadict[pkey] = objconverted
183 else:
184 __hermes__.logger.warning(
185 f"<{self.__class__.__name__}> Trying to insert an already existing"
186 f" object: {objconverted=}"
187 )
188 self._inconsistencies.add(pkey)
189 del self._datadict[pkey]
191 def replace(self, obj: DataObject):
192 """Replace specified DataObject (i.e. with same pkey, but different values) in
193 current instance"""
194 pkey = obj.getPKey()
195 if pkey not in self._datadict:
196 raise IndexError(
197 f"Cannot replace object with pkey {pkey} as previous doesn't exist"
198 )
199 self._datadict[pkey] = obj
201 def removeByPkey(self, pkey: Any):
202 """Remove DataObject corresponding to specified pkey from current instance"""
203 if pkey in self._datadict:
204 del self._datadict[pkey]
206 def remove(self, obj: DataObject):
207 """Remove specified DataObject from current instance"""
208 self.removeByPkey(obj.getPKey())
210 def toNative(self) -> list[dict[str, Any]]:
211 """Return a list of complete data dict of current DataObject values"""
212 return [item.toNative() for item in self._datadict.values()]
214 def mergeWith(
215 self,
216 objlist: list[DataObject],
217 pkeyMergeConstraint: str,
218 dontMergeOnConflict=False,
219 ) -> set[Any]:
220 """Merge specified objlist data in current
221 If dontMergeOnConflict is True, pkeys of items with conflict will be put in
222 mergeConflicts and items will be removed of current list. Otherwise conflicting
223 data of item in current instance will be kept
224 Returns a set containing pkeys of items filtered by pkeyMergeConstraint
225 """
227 validsPkeyMergeConstraints = (
228 "noConstraint",
229 "mustNotExist",
230 "mustAlreadyExist",
231 "mustExistInBoth",
232 )
234 if pkeyMergeConstraint not in validsPkeyMergeConstraints:
235 raise AttributeError(
236 f"Specified {pkeyMergeConstraint=} is invalid."
237 f" Valiid values are {validsPkeyMergeConstraints}"
238 )
240 pkeysMerged = set()
241 pkeysToRemove = set()
242 pkeysIgnored = set()
244 for obj in objlist:
245 pkey = obj.getPKey()
246 if pkey not in self.getPKeys():
247 if pkeyMergeConstraint in ("noConstraint", "mustNotExist"):
248 # Constraint is respected, add object
249 pkeysMerged.add(pkey)
250 self.append(obj)
251 elif pkeyMergeConstraint in ("mustAlreadyExist", "mustExistInBoth"):
252 # Constraint isn't respected, don't merge object, nothing else to do
253 pkeysIgnored.add(pkey)
254 else:
255 if pkeyMergeConstraint in (
256 "noConstraint",
257 "mustAlreadyExist",
258 "mustExistInBoth",
259 ):
260 # Constraint is respected, merge object
261 pkeysMerged.add(pkey)
262 newobj = self[pkey]
263 try:
264 newobj.mergeWith(obj, dontMergeOnConflict)
265 except HermesMergingConflictError:
266 self._mergeConflicts.add(pkey)
267 self.removeByPkey(pkey)
268 else:
269 # newobj may be a new instance, so overwrite current reference
270 # in datadict
271 self.replace(newobj)
272 elif pkeyMergeConstraint == "mustNotExist":
273 # Constraint isn't respected, remove object
274 pkeysToRemove.add(pkey)
276 if pkeyMergeConstraint == "mustExistInBoth":
277 pkeysToRemove |= self.getPKeys() - pkeysMerged
279 if pkeysToRemove:
280 for pkey in pkeysToRemove:
281 self.removeByPkey(pkey)
283 __hermes__.logger.debug(
284 f"pkey_merge_constraints: merged {len(pkeysMerged)} objects, ignored"
285 f" {len(pkeysIgnored)} objects, removed {len(pkeysToRemove)} objects from"
286 f" {type(self)}"
287 )
289 return pkeysIgnored | pkeysToRemove
291 def diffFrom(self, other: AnyDataObjectList) -> DiffObject:
292 """Returns a DiffObject containing differences between current instance and
293 specified 'other', assuming current is the newest"""
294 starttime = time.time()
295 diff = DiffObject()
297 s = self.getPKeys()
298 o = other.getPKeys()
299 commonattrs = s & o
301 diff.appendRemoved([other[pkey] for pkey in (o - s)])
302 diff.appendAdded([self[pkey] for pkey in (s - o)])
304 for pkey, obj in self._datadict.items():
305 if pkey in commonattrs:
306 diffobj = obj.diffFrom(other[pkey])
307 if diffobj:
308 diff.appendModified(diffobj)
310 elapsedtime = time.time() - starttime
311 elapsed = int(round(1000 * elapsedtime))
313 diffcount = [f"{len(v)} {k}" for k, v in diff.dict.items() if len(v) > 0]
314 info = ", ".join(diffcount) if diffcount else "no difference"
315 __hermes__.logger.debug(
316 f"{self.__class__.__name__}: Diffed {len(s)}/{len(o)} entries in"
317 f" {elapsed} ms: {info}"
318 )
319 return diff
321 @property
322 def inconsistencies(self) -> set[Any]:
323 """Returns a set containing primary keys of all duplicated entries
325 Warning: only indicate duplicated entries of first declared source in current
326 type, duplicated entries of other sources will be notified in mergeConflicts"""
327 return self._inconsistencies.copy()
329 def replaceInconsistenciesByCachedValues(self, cache: AnyDataObjectList):
330 """Replace each entry filtered for inconsistency by their cache value, when
331 existing"""
332 for src, srcname in [
333 (self._inconsistencies, "inconsistency"),
334 (self._mergeConflicts, "merge conflict"),
335 ]:
336 for pkey in src:
337 if pkey in cache.getPKeys():
338 self._datadict[pkey] = cache[pkey]
339 __hermes__.logger.warning(
340 f"Entry of pkey {pkey} with {srcname} found in cache,"
341 " using cache value"
342 )
343 else:
344 # Data shouldn't contains an entry with this pkey anymore,
345 # nothing to do
346 __hermes__.logger.warning(
347 f"Entry of pkey {pkey} with {srcname} not found in cache,"
348 " ignoring it"
349 )
351 @property
352 def mergeConflicts(self) -> set[Any]:
353 """Returns a set containing primary keys of each entry with a merge conflict
354 (i.e. when the same attribute has different values on different sources)"""
355 return self._mergeConflicts.copy()
357 def extend(self, other: AnyDataObjectList):
358 """Extend current DataObjectList content with the specified other's content,
359 by reference.
361 The primary keys of "other" must not exist in current instance, otherwise
362 an KeyError exception will be raised.
363 """
364 if not self._datadict.keys().isdisjoint(other._datadict.keys()):
365 raise KeyError(
366 "Unable to extend, as current instance and 'other' contains some"
367 " objects with the same primary key"
368 )
370 self._datadict |= other._datadict