Coverage for lib/datamodel/dataobjectlist.py: 99%

153 statements  

« prev     ^ index     » next       coverage.py v7.10.1, created at 2025-07-28 07:24 +0000

1#!/usr/bin/env python3 

2# -*- coding: utf-8 -*- 

3 

4# Hermes : Change Data Capture (CDC) tool from any source(s) to any target 

5# Copyright (C) 2023, 2024 INSA Strasbourg 

6# 

7# This file is part of Hermes. 

8# 

9# Hermes is free software: you can redistribute it and/or modify 

10# it under the terms of the GNU General Public License as published by 

11# the Free Software Foundation, either version 3 of the License, or 

12# (at your option) any later version. 

13# 

14# Hermes is distributed in the hope that it will be useful, 

15# but WITHOUT ANY WARRANTY; without even the implied warranty of 

16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

17# GNU General Public License for more details. 

18# 

19# You should have received a copy of the GNU General Public License 

20# along with Hermes. If not, see <https://www.gnu.org/licenses/>. 

21 

22 

23from typing import TypeVar, Any, Iterable 

24 

25import time 

26 

27from lib.datamodel.diffobject import DiffObject 

28from lib.datamodel.dataobject import DataObject, HermesMergingConflictError 

29from lib.datamodel.foreignkey import ForeignKey 

30from lib.datamodel.serialization import LocalCache 

31 

32AnyDataObjectList = TypeVar("AnyDataObjectList", bound="DataObjectList") 

33 

34 

35class DataObjectList(LocalCache): 

36 """Generic serializable list of DataObject 

37 

38 Subclasses must define the following class vars: 

39 - OBJTYPE: data type contained in list, mandatory for deserialization 

40 

41 The class provides 

42 - data storage 

43 - index creation for DataObject.PRIMARYKEY_ATTRIBUTE. 

44 Indexed data is accessible by self[pkeyvalue]. 

45 - inconsistencices (duplicates) detection and replacement by cache values 

46 - mergeConflicts detection and replacement by cache values 

47 - merge_constraints and integrity_constraints filtered items storage with respective 

48 attributes mergeFiltered and integrityFiltered 

49 - json serialization/deserialization 

50 - diffFrom() function generating DiffFrom object 

51 """ 

52 

53 OBJTYPE: "type[DataObject]" = DataObject 

54 """Object type stored by current class""" 

55 

56 FOREIGNKEYS: list[ForeignKey] = [] 

57 """Foreign keys of current OBJTYPE""" 

58 

59 def __init__( 

60 self, 

61 objlist: list[DataObject] | None = None, 

62 from_json_dict: list[dict[str, Any]] | None = None, 

63 ): 

64 """Create a new instance, with data coming from json (for deserialization), 

65 or from specified list of DataObject. 

66 

67 If data is from json, every objects will be instantiated by deserialization. 

68 If data is from objlist, every object of another type that self.OBJTYPE will be 

69 casted to OBJTYPE 

70 """ 

71 super().__init__(jsondataattr="_data") 

72 

73 self._inconsistencies: set[Any] = set() 

74 """Set containing primary keys of all duplicated entries""" 

75 

76 self._mergeConflicts: set[Any] = set() 

77 """Set containing primary keys of each entry with a merge conflict (i.e. when 

78 the same attribute has different values on different sources)""" 

79 

80 self.mergeFiltered: set[Any] = set() 

81 """Set containing primary keys of each entry filtered by merge constraints""" 

82 

83 self.integrityFiltered: set[Any] = set() 

84 """Set containing primary keys of each entry filtered by integrity 

85 constraints""" 

86 

87 self._datadict: dict[Any, DataObject] = {} 

88 """Dictionary containing the data, with primary keys as keys, and DataObject as 

89 values""" 

90 

91 if objlist is None and from_json_dict is None: 

92 err = ( 

93 "Cannot instantiate object from nothing: you must specify one data" 

94 " source" 

95 ) 

96 __hermes__.logger.critical(err) 

97 raise AttributeError(err) 

98 

99 if objlist is not None and from_json_dict is not None: 

100 err = "Cannot instantiate object from multiple data sources at once" 

101 __hermes__.logger.critical(err) 

102 raise AttributeError(err) 

103 

104 if objlist is not None: 

105 self.__init_from_objlist__(objlist) 

106 elif from_json_dict is not None: 

107 self.__init_from_json_dict__(from_json_dict) 

108 

109 def __init_from_json_dict__(self, from_json_dict: list[dict[str, Any]]): 

110 """Create a new instance, with data coming from json. 

111 Every objects in list will be instantiated by deserialization.""" 

112 self.__init_from_objlist__( 

113 [self.OBJTYPE(from_json_dict=item) for item in from_json_dict] 

114 ) 

115 

116 def __init_from_objlist__(self, objlist: list[DataObject]): 

117 """Create a new instance, with data from specified list of DataObject. 

118 Every object of another type that self.OBJTYPE will be casted to OBJTYPE. 

119 """ 

120 for obj in objlist: 

121 self.append(obj) 

122 

123 @property 

124 def _data(self) -> list[DataObject]: 

125 """Returns a list of current DataObject values""" 

126 return [self._datadict[k] for k in sorted(self._datadict.keys())] 

127 

128 def __len__(self) -> int: 

129 """Returns the number of items in current instance""" 

130 return len(self._datadict) 

131 

132 def __iter__(self) -> Iterable: 

133 """Returns an iterator of current DataObject values""" 

134 return iter(self._datadict.values()) 

135 

136 def __getitem__(self, objOrPkey: Any) -> DataObject: 

137 """Indexer operator '[]' returning DataObject entry with same pkey as specified 

138 one, or as specified DataObject""" 

139 if isinstance(objOrPkey, DataObject): 

140 return self._datadict[objOrPkey.getPKey()] 

141 else: 

142 return self._datadict[objOrPkey] 

143 

144 def __contains__(self, objOrPkey: Any) -> bool: 

145 """'in' operator: return True if specified DataObject or pkey exists in current 

146 instance""" 

147 if isinstance(objOrPkey, DataObject): 

148 return objOrPkey.getPKey() in self._datadict 

149 else: 

150 return objOrPkey in self._datadict 

151 

152 def get(self, pkey: Any, __default: Any = None) -> Any: 

153 """Returns DataObject entry with specified pkey, or __default value if no entry 

154 was found""" 

155 return self._datadict.get(pkey, __default) 

156 

157 def getPKeys(self) -> set[Any]: 

158 """Returns a set of each primary key of current DataObject values""" 

159 return set(self._datadict.keys()) 

160 

161 def append(self, obj: DataObject): 

162 """Append specified object to current instance. 

163 If obj is of another type than self.OBJTYPE, it will be casted to OBJTYPE. 

164 If obj is already in current instance, it will be put in _inconsistencies 

165 """ 

166 if type(obj) is self.OBJTYPE: 

167 objconverted = obj 

168 else: 

169 # Recreate object with the required type 

170 # (useful when merging data from datamodel) 

171 objconverted = self.OBJTYPE(from_json_dict=obj.toNative()) 

172 

173 pkey = objconverted.getPKey() 

174 if pkey in self._inconsistencies | self._mergeConflicts: 

175 # __hermes__.logger.debug( 

176 # f"<{self.__class__.__name__}> Ignoring {objconverted=}" 

177 # " because already known as an inconsistency" 

178 # ) 

179 return 

180 

181 if pkey not in self._datadict: 

182 self._datadict[pkey] = objconverted 

183 else: 

184 __hermes__.logger.warning( 

185 f"<{self.__class__.__name__}> Trying to insert an already existing" 

186 f" object: {objconverted=}" 

187 ) 

188 self._inconsistencies.add(pkey) 

189 del self._datadict[pkey] 

190 

191 def replace(self, obj: DataObject): 

192 """Replace specified DataObject (i.e. with same pkey, but different values) in 

193 current instance""" 

194 pkey = obj.getPKey() 

195 if pkey not in self._datadict: 

196 raise IndexError( 

197 f"Cannot replace object with pkey {pkey} as previous doesn't exist" 

198 ) 

199 self._datadict[pkey] = obj 

200 

201 def removeByPkey(self, pkey: Any): 

202 """Remove DataObject corresponding to specified pkey from current instance""" 

203 if pkey in self._datadict: 

204 del self._datadict[pkey] 

205 

206 def remove(self, obj: DataObject): 

207 """Remove specified DataObject from current instance""" 

208 self.removeByPkey(obj.getPKey()) 

209 

210 def toNative(self) -> list[dict[str, Any]]: 

211 """Return a list of complete data dict of current DataObject values""" 

212 return [item.toNative() for item in self._datadict.values()] 

213 

214 def mergeWith( 

215 self, 

216 objlist: list[DataObject], 

217 pkeyMergeConstraint: str, 

218 dontMergeOnConflict=False, 

219 ) -> set[Any]: 

220 """Merge specified objlist data in current 

221 If dontMergeOnConflict is True, pkeys of items with conflict will be put in 

222 mergeConflicts and items will be removed of current list. Otherwise conflicting 

223 data of item in current instance will be kept 

224 Returns a set containing pkeys of items filtered by pkeyMergeConstraint 

225 """ 

226 

227 validsPkeyMergeConstraints = ( 

228 "noConstraint", 

229 "mustNotExist", 

230 "mustAlreadyExist", 

231 "mustExistInBoth", 

232 ) 

233 

234 if pkeyMergeConstraint not in validsPkeyMergeConstraints: 

235 raise AttributeError( 

236 f"Specified {pkeyMergeConstraint=} is invalid." 

237 f" Valiid values are {validsPkeyMergeConstraints}" 

238 ) 

239 

240 pkeysMerged = set() 

241 pkeysToRemove = set() 

242 pkeysIgnored = set() 

243 

244 for obj in objlist: 

245 pkey = obj.getPKey() 

246 if pkey not in self.getPKeys(): 

247 if pkeyMergeConstraint in ("noConstraint", "mustNotExist"): 

248 # Constraint is respected, add object 

249 pkeysMerged.add(pkey) 

250 self.append(obj) 

251 elif pkeyMergeConstraint in ("mustAlreadyExist", "mustExistInBoth"): 

252 # Constraint isn't respected, don't merge object, nothing else to do 

253 pkeysIgnored.add(pkey) 

254 else: 

255 if pkeyMergeConstraint in ( 

256 "noConstraint", 

257 "mustAlreadyExist", 

258 "mustExistInBoth", 

259 ): 

260 # Constraint is respected, merge object 

261 pkeysMerged.add(pkey) 

262 newobj = self[pkey] 

263 try: 

264 newobj.mergeWith(obj, dontMergeOnConflict) 

265 except HermesMergingConflictError: 

266 self._mergeConflicts.add(pkey) 

267 self.removeByPkey(pkey) 

268 else: 

269 # newobj may be a new instance, so overwrite current reference 

270 # in datadict 

271 self.replace(newobj) 

272 elif pkeyMergeConstraint == "mustNotExist": 

273 # Constraint isn't respected, remove object 

274 pkeysToRemove.add(pkey) 

275 

276 if pkeyMergeConstraint == "mustExistInBoth": 

277 pkeysToRemove |= self.getPKeys() - pkeysMerged 

278 

279 if pkeysToRemove: 

280 for pkey in pkeysToRemove: 

281 self.removeByPkey(pkey) 

282 

283 __hermes__.logger.debug( 

284 f"pkey_merge_constraints: merged {len(pkeysMerged)} objects, ignored" 

285 f" {len(pkeysIgnored)} objects, removed {len(pkeysToRemove)} objects from" 

286 f" {type(self)}" 

287 ) 

288 

289 return pkeysIgnored | pkeysToRemove 

290 

291 def diffFrom(self, other: AnyDataObjectList) -> DiffObject: 

292 """Returns a DiffObject containing differences between current instance and 

293 specified 'other', assuming current is the newest""" 

294 starttime = time.time() 

295 diff = DiffObject() 

296 

297 s = self.getPKeys() 

298 o = other.getPKeys() 

299 commonattrs = s & o 

300 

301 diff.appendRemoved([other[pkey] for pkey in (o - s)]) 

302 diff.appendAdded([self[pkey] for pkey in (s - o)]) 

303 

304 for pkey, obj in self._datadict.items(): 

305 if pkey in commonattrs: 

306 diffobj = obj.diffFrom(other[pkey]) 

307 if diffobj: 

308 diff.appendModified(diffobj) 

309 

310 elapsedtime = time.time() - starttime 

311 elapsed = int(round(1000 * elapsedtime)) 

312 

313 diffcount = [f"{len(v)} {k}" for k, v in diff.dict.items() if len(v) > 0] 

314 info = ", ".join(diffcount) if diffcount else "no difference" 

315 __hermes__.logger.debug( 

316 f"{self.__class__.__name__}: Diffed {len(s)}/{len(o)} entries in" 

317 f" {elapsed} ms: {info}" 

318 ) 

319 return diff 

320 

321 @property 

322 def inconsistencies(self) -> set[Any]: 

323 """Returns a set containing primary keys of all duplicated entries 

324 

325 Warning: only indicate duplicated entries of first declared source in current 

326 type, duplicated entries of other sources will be notified in mergeConflicts""" 

327 return self._inconsistencies.copy() 

328 

329 def replaceInconsistenciesByCachedValues(self, cache: AnyDataObjectList): 

330 """Replace each entry filtered for inconsistency by their cache value, when 

331 existing""" 

332 for src, srcname in [ 

333 (self._inconsistencies, "inconsistency"), 

334 (self._mergeConflicts, "merge conflict"), 

335 ]: 

336 for pkey in src: 

337 if pkey in cache.getPKeys(): 

338 self._datadict[pkey] = cache[pkey] 

339 __hermes__.logger.warning( 

340 f"Entry of pkey {pkey} with {srcname} found in cache," 

341 " using cache value" 

342 ) 

343 else: 

344 # Data shouldn't contains an entry with this pkey anymore, 

345 # nothing to do 

346 __hermes__.logger.warning( 

347 f"Entry of pkey {pkey} with {srcname} not found in cache," 

348 " ignoring it" 

349 ) 

350 

351 @property 

352 def mergeConflicts(self) -> set[Any]: 

353 """Returns a set containing primary keys of each entry with a merge conflict 

354 (i.e. when the same attribute has different values on different sources)""" 

355 return self._mergeConflicts.copy() 

356 

357 def extend(self, other: AnyDataObjectList): 

358 """Extend current DataObjectList content with the specified other's content, 

359 by reference. 

360 

361 The primary keys of "other" must not exist in current instance, otherwise 

362 an KeyError exception will be raised. 

363 """ 

364 if not self._datadict.keys().isdisjoint(other._datadict.keys()): 

365 raise KeyError( 

366 "Unable to extend, as current instance and 'other' contains some" 

367 " objects with the same primary key" 

368 ) 

369 

370 self._datadict |= other._datadict