Coverage for lib / datamodel / dataobjectlist.py: 99%

154 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-03-16 15:11 +0000

1#!/usr/bin/env python3 

2# -*- coding: utf-8 -*- 

3 

4# Hermes : Change Data Capture (CDC) tool from any source(s) to any target 

5# Copyright (C) 2023 INSA Strasbourg 

6# 

7# This file is part of Hermes. 

8# 

9# Hermes is free software: you can redistribute it and/or modify 

10# it under the terms of the GNU General Public License as published by 

11# the Free Software Foundation, either version 3 of the License, or 

12# (at your option) any later version. 

13# 

14# Hermes is distributed in the hope that it will be useful, 

15# but WITHOUT ANY WARRANTY; without even the implied warranty of 

16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

17# GNU General Public License for more details. 

18# 

19# You should have received a copy of the GNU General Public License 

20# along with Hermes. If not, see <https://www.gnu.org/licenses/>. 

21 

22 

23from typing import TypeVar, Any, Iterable 

24 

25import time 

26 

27from lib.datamodel.diffobject import DiffObject 

28from lib.datamodel.dataobject import DataObject, HermesMergingConflictError 

29from lib.datamodel.foreignkey import ForeignKey 

30from lib.datamodel.serialization import LocalCache 

31 

32AnyDataObjectList = TypeVar("AnyDataObjectList", bound="DataObjectList") 

33 

34 

35class DataObjectList(LocalCache): 

36 """Generic serializable list of DataObject 

37 

38 Subclasses must define the following class vars: 

39 - OBJTYPE: data type contained in list, mandatory for deserialization 

40 

41 The class provides 

42 - data storage 

43 - index creation for DataObject.PRIMARYKEY_ATTRIBUTE. 

44 Indexed data is accessible by self[pkeyvalue]. 

45 - inconsistencices (duplicates) detection and replacement by cache values 

46 - mergeConflicts detection and replacement by cache values 

47 - merge_constraints and integrity_constraints filtered items storage with respective 

48 attributes mergeFiltered and integrityFiltered 

49 - json serialization/deserialization 

50 - diffFrom() function generating DiffFrom object 

51 """ 

52 

53 OBJTYPE: "type[DataObject]" = DataObject 

54 """Object type stored by current class""" 

55 

56 FOREIGNKEYS: list[ForeignKey] = [] 

57 """Foreign keys of current OBJTYPE""" 

58 

59 def __init__( 

60 self, 

61 objlist: list[DataObject] | None = None, 

62 from_json_dict: list[dict[str, Any]] | None = None, 

63 ): 

64 """Create a new instance, with data coming from json (for deserialization), 

65 or from specified list of DataObject. 

66 

67 If data is from json, every objects will be instantiated by deserialization. 

68 If data is from objlist, every object of another type that self.OBJTYPE will be 

69 casted to OBJTYPE 

70 """ 

71 super().__init__(jsondataattr="_data") 

72 

73 self._inconsistencies: set[Any] = set() 

74 """Set containing primary keys of all duplicated entries""" 

75 

76 self._mergeConflicts: set[Any] = set() 

77 """Set containing primary keys of each entry with a merge conflict (i.e. when 

78 the same attribute has different values on different sources)""" 

79 

80 self.mergeFiltered: set[Any] = set() 

81 """Set containing primary keys of each entry filtered by merge constraints""" 

82 

83 self.integrityFiltered: set[Any] = set() 

84 """Set containing primary keys of each entry filtered by integrity 

85 constraints""" 

86 

87 self._datadict: dict[Any, DataObject] = {} 

88 """Dictionary containing the data, with primary keys as keys, and DataObject as 

89 values""" 

90 

91 if objlist is None and from_json_dict is None: 

92 err = ( 

93 "Cannot instantiate object from nothing: you must specify one data" 

94 " source" 

95 ) 

96 __hermes__.logger.critical(err) 

97 raise AttributeError(err) 

98 

99 if objlist is not None and from_json_dict is not None: 

100 err = "Cannot instantiate object from multiple data sources at once" 

101 __hermes__.logger.critical(err) 

102 raise AttributeError(err) 

103 

104 if objlist is not None: 

105 self.__init_from_objlist__(objlist) 

106 elif from_json_dict is not None: 

107 self.__init_from_json_dict__(from_json_dict) 

108 

109 def __init_from_json_dict__(self, from_json_dict: list[dict[str, Any]]): 

110 """Create a new instance, with data coming from json. 

111 Every objects in list will be instantiated by deserialization.""" 

112 self.__init_from_objlist__( 

113 [self.OBJTYPE(from_json_dict=item) for item in from_json_dict] 

114 ) 

115 

116 def __init_from_objlist__(self, objlist: list[DataObject]): 

117 """Create a new instance, with data from specified list of DataObject. 

118 Every object of another type that self.OBJTYPE will be casted to OBJTYPE. 

119 """ 

120 for obj in objlist: 

121 self.append(obj) 

122 

123 @property 

124 def _data(self) -> list[DataObject]: 

125 """Returns a list of current DataObject values""" 

126 return [self._datadict[k] for k in sorted(self._datadict.keys())] 

127 

128 def __len__(self) -> int: 

129 """Returns the number of items in current instance""" 

130 return len(self._datadict) 

131 

132 def __iter__(self) -> Iterable: 

133 """Returns an iterator of current DataObject values""" 

134 return iter(self._datadict.values()) 

135 

136 def __getitem__(self, objOrPkey: Any) -> DataObject: 

137 """Indexer operator '[]' returning DataObject entry with same pkey as specified 

138 one, or as specified DataObject""" 

139 if isinstance(objOrPkey, DataObject): 

140 return self._datadict[objOrPkey.getPKey()] 

141 else: 

142 return self._datadict[objOrPkey] 

143 

144 def __contains__(self, objOrPkey: Any) -> bool: 

145 """'in' operator: return True if specified DataObject or pkey exists in current 

146 instance""" 

147 if isinstance(objOrPkey, DataObject): 

148 return objOrPkey.getPKey() in self._datadict 

149 else: 

150 return objOrPkey in self._datadict 

151 

152 def get(self, pkey: Any, __default: Any = None) -> Any: 

153 """Returns DataObject entry with specified pkey, or __default value if no entry 

154 was found""" 

155 return self._datadict.get(pkey, __default) 

156 

157 def getPKeys(self) -> set[Any]: 

158 """Returns a set of each primary key of current DataObject values""" 

159 return set(self._datadict.keys()) 

160 

161 def append(self, obj: DataObject, ignoreIfAlreadyPresent: bool = False): 

162 """Append specified object to current instance. 

163 If obj is of another type than self.OBJTYPE, it will be casted to OBJTYPE. 

164 If obj is already in current instance, it will be put in _inconsistencies 

165 """ 

166 if type(obj) is self.OBJTYPE: 

167 objconverted = obj 

168 else: 

169 # Recreate object with the required type 

170 # (useful when merging data from datamodel) 

171 objconverted = self.OBJTYPE(from_json_dict=obj.toNative()) 

172 

173 pkey = objconverted.getPKey() 

174 if ( 

175 not ignoreIfAlreadyPresent 

176 and pkey in self._inconsistencies | self._mergeConflicts 

177 ): 

178 # __hermes__.logger.debug( 

179 # f"<{self.__class__.__name__}> Ignoring {objconverted=}" 

180 # " because already known as an inconsistency" 

181 # ) 

182 return 

183 

184 if pkey not in self._datadict: 

185 self._datadict[pkey] = objconverted 

186 elif not ignoreIfAlreadyPresent: 

187 __hermes__.logger.warning( 

188 f"<{self.__class__.__name__}> Trying to insert an already existing" 

189 f" object: {objconverted=}" 

190 ) 

191 self._inconsistencies.add(pkey) 

192 del self._datadict[pkey] 

193 

194 def replace(self, obj: DataObject): 

195 """Replace specified DataObject (i.e. with same pkey, but different values) in 

196 current instance""" 

197 pkey = obj.getPKey() 

198 if pkey not in self._datadict: 

199 raise IndexError( 

200 f"Cannot replace object with pkey {pkey} as previous doesn't exist" 

201 ) 

202 self._datadict[pkey] = obj 

203 

204 def removeByPkey(self, pkey: Any): 

205 """Remove DataObject corresponding to specified pkey from current instance""" 

206 if pkey in self._datadict: 

207 del self._datadict[pkey] 

208 

209 def remove(self, obj: DataObject): 

210 """Remove specified DataObject from current instance""" 

211 self.removeByPkey(obj.getPKey()) 

212 

213 def toNative(self) -> list[dict[str, Any]]: 

214 """Return a list of complete data dict of current DataObject values""" 

215 return [item.toNative() for item in self._datadict.values()] 

216 

217 def mergeWith( 

218 self, 

219 objlist: list[DataObject], 

220 pkeyMergeConstraint: str, 

221 dontMergeOnConflict=False, 

222 ) -> set[Any]: 

223 """Merge specified objlist data in current 

224 If dontMergeOnConflict is True, pkeys of items with conflict will be put in 

225 mergeConflicts and items will be removed of current list. Otherwise conflicting 

226 data of item in current instance will be kept 

227 Returns a set containing pkeys of items filtered by pkeyMergeConstraint 

228 """ 

229 

230 validsPkeyMergeConstraints = ( 

231 "noConstraint", 

232 "mustNotExist", 

233 "mustAlreadyExist", 

234 "mustExistInBoth", 

235 ) 

236 

237 if pkeyMergeConstraint not in validsPkeyMergeConstraints: 

238 raise AttributeError( 

239 f"Specified {pkeyMergeConstraint=} is invalid." 

240 f" Valiid values are {validsPkeyMergeConstraints}" 

241 ) 

242 

243 pkeysMerged = set() 

244 pkeysToRemove = set() 

245 pkeysIgnored = set() 

246 

247 for obj in objlist: 

248 pkey = obj.getPKey() 

249 if pkey not in self.getPKeys(): 

250 if pkeyMergeConstraint in ("noConstraint", "mustNotExist"): 

251 # Constraint is respected, add object 

252 pkeysMerged.add(pkey) 

253 self.append(obj) 

254 elif pkeyMergeConstraint in ("mustAlreadyExist", "mustExistInBoth"): 

255 # Constraint isn't respected, don't merge object, nothing else to do 

256 pkeysIgnored.add(pkey) 

257 else: 

258 if pkeyMergeConstraint in ( 

259 "noConstraint", 

260 "mustAlreadyExist", 

261 "mustExistInBoth", 

262 ): 

263 # Constraint is respected, merge object 

264 pkeysMerged.add(pkey) 

265 newobj = self[pkey] 

266 try: 

267 newobj.mergeWith(obj, dontMergeOnConflict) 

268 except HermesMergingConflictError: 

269 self._mergeConflicts.add(pkey) 

270 self.removeByPkey(pkey) 

271 else: 

272 # newobj may be a new instance, so overwrite current reference 

273 # in datadict 

274 self.replace(newobj) 

275 elif pkeyMergeConstraint == "mustNotExist": 

276 # Constraint isn't respected, remove object 

277 pkeysToRemove.add(pkey) 

278 

279 if pkeyMergeConstraint == "mustExistInBoth": 

280 pkeysToRemove |= self.getPKeys() - pkeysMerged 

281 

282 if pkeysToRemove: 

283 for pkey in pkeysToRemove: 

284 self.removeByPkey(pkey) 

285 

286 __hermes__.logger.debug( 

287 f"pkey_merge_constraints: merged {len(pkeysMerged)} objects, ignored" 

288 f" {len(pkeysIgnored)} objects, removed {len(pkeysToRemove)} objects from" 

289 f" {type(self)}" 

290 ) 

291 

292 return pkeysIgnored | pkeysToRemove 

293 

294 def diffFrom(self, other: AnyDataObjectList) -> DiffObject: 

295 """Returns a DiffObject containing differences between current instance and 

296 specified 'other', assuming current is the newest""" 

297 starttime = time.time() 

298 diff = DiffObject() 

299 

300 s = self.getPKeys() 

301 o = other.getPKeys() 

302 commonattrs = s & o 

303 

304 diff.appendRemoved([other[pkey] for pkey in (o - s)]) 

305 diff.appendAdded([self[pkey] for pkey in (s - o)]) 

306 

307 for pkey, obj in self._datadict.items(): 

308 if pkey in commonattrs: 

309 diffobj = obj.diffFrom(other[pkey]) 

310 if diffobj: 

311 diff.appendModified(diffobj) 

312 

313 elapsedtime = time.time() - starttime 

314 elapsed = int(round(1000 * elapsedtime)) 

315 

316 diffcount = [f"{len(v)} {k}" for k, v in diff.dict.items() if len(v) > 0] 

317 info = ", ".join(diffcount) if diffcount else "no difference" 

318 __hermes__.logger.debug( 

319 f"{self.__class__.__name__}: Diffed {len(s)}/{len(o)} entries in" 

320 f" {elapsed} ms: {info}" 

321 ) 

322 return diff 

323 

324 @property 

325 def inconsistencies(self) -> set[Any]: 

326 """Returns a set containing primary keys of all duplicated entries 

327 

328 Warning: only indicate duplicated entries of first declared source in current 

329 type, duplicated entries of other sources will be notified in mergeConflicts""" 

330 return self._inconsistencies.copy() 

331 

332 def replaceInconsistenciesByCachedValues(self, cache: AnyDataObjectList): 

333 """Replace each entry filtered for inconsistency by their cache value, when 

334 existing""" 

335 for src, srcname in [ 

336 (self._inconsistencies, "inconsistency"), 

337 (self._mergeConflicts, "merge conflict"), 

338 ]: 

339 for pkey in src: 

340 if pkey in cache.getPKeys(): 

341 self._datadict[pkey] = cache[pkey] 

342 __hermes__.logger.warning( 

343 f"Entry of pkey {pkey} with {srcname} found in cache," 

344 " using cache value" 

345 ) 

346 else: 

347 # Data shouldn't contains an entry with this pkey anymore, 

348 # nothing to do 

349 __hermes__.logger.warning( 

350 f"Entry of pkey {pkey} with {srcname} not found in cache," 

351 " ignoring it" 

352 ) 

353 

354 @property 

355 def mergeConflicts(self) -> set[Any]: 

356 """Returns a set containing primary keys of each entry with a merge conflict 

357 (i.e. when the same attribute has different values on different sources)""" 

358 return self._mergeConflicts.copy() 

359 

360 def extend(self, other: AnyDataObjectList): 

361 """Extend current DataObjectList content with the specified other's content, 

362 by reference. 

363 

364 The primary keys of "other" must not exist in current instance, otherwise 

365 an KeyError exception will be raised. 

366 """ 

367 if not self._datadict.keys().isdisjoint(other._datadict.keys()): 

368 raise KeyError( 

369 "Unable to extend, as current instance and 'other' contains some" 

370 " objects with the same primary key" 

371 ) 

372 

373 self._datadict |= other._datadict