Coverage for lib/datamodel/serialization.py: 83%

233 statements  

« prev     ^ index     » next       coverage.py v7.10.1, created at 2025-07-28 07:24 +0000

1#!/usr/bin/env python3 

2# -*- coding: utf-8 -*- 

3 

4# Hermes : Change Data Capture (CDC) tool from any source(s) to any target 

5# Copyright (C) 2023, 2024 INSA Strasbourg 

6# 

7# This file is part of Hermes. 

8# 

9# Hermes is free software: you can redistribute it and/or modify 

10# it under the terms of the GNU General Public License as published by 

11# the Free Software Foundation, either version 3 of the License, or 

12# (at your option) any later version. 

13# 

14# Hermes is distributed in the hope that it will be useful, 

15# but WITHOUT ANY WARRANTY; without even the implied warranty of 

16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

17# GNU General Public License for more details. 

18# 

19# You should have received a copy of the GNU General Public License 

20# along with Hermes. If not, see <https://www.gnu.org/licenses/>. 

21 

22 

23from typing import TypeVar, Any, TYPE_CHECKING 

24 

25if TYPE_CHECKING: # pragma: no cover 

26 # Only for type hints, won't import at runtime 

27 from typing import Callable, IO 

28 from lib.config import HermesConfig 

29 

30from lib.version import HERMES_VERSION, HERMES_VERSIONS 

31from datetime import datetime 

32from tempfile import NamedTemporaryFile 

33import base64 

34import json 

35import os 

36import os.path 

37import gzip 

38import re 

39 

40AnyJSONSerializable = TypeVar("AnyJSONSerializable", bound="JSONSerializable") 

41AnyLocalCache = TypeVar("AnyLocalCache", bound="LocalCache") 

42 

43 

44class HermesInvalidVersionError(Exception): 

45 """Raised when one of previous of current HERMES_VERSION is missing from 

46 HERMES_VERSIONS""" 

47 

48 

49class HermesInvalidJSONError(Exception): 

50 """Raised when the json passed to from_json is invalid""" 

51 

52 

53class HermesInvalidJSONDataError(Exception): 

54 """Raised when the data passed to from_json has invalid type""" 

55 

56 

57class HermesInvalidJSONDataattrTypeError(Exception): 

58 """Raised when the type of specified jsondataattr is invalid""" 

59 

60 

61class HermesInvalidCacheDirError(Exception): 

62 """Raised when an CacheDir exist and isn't a directory or isn't writeable""" 

63 

64 

65class HermesUnspecifiedCacheFilename(Exception): 

66 """Raised when trying to save cache file from an instance where setCacheFilename() 

67 has never been called""" 

68 

69 

70class HermesLocalCacheNotSetupError(Exception): 

71 """Raised when trying to use LocalCache without having called LocalCache.setup() 

72 before""" 

73 

74 

75class JSONEncoder(json.JSONEncoder): 

76 """Helper to serialize specific objects (datetime, JSONSerializable) in JSON""" 

77 

78 def default(self, obj: Any) -> Any: 

79 # If object to encode is a datetime, convert it to an internal isoformat string 

80 if isinstance(obj, datetime): 

81 obj_notz = obj.replace(tzinfo=None) 

82 return f"HermesDatetime({obj_notz.isoformat(timespec='seconds')}Z)" 

83 if isinstance(obj, bytes): 

84 return f"HermesBytes({base64.b64encode(obj).decode('ascii')})" 

85 if isinstance(obj, JSONSerializable): 

86 return obj._get_jsondict() 

87 if isinstance(obj, set): 

88 return sorted(list(obj)) 

89 return json.JSONEncoder.default(self, obj) 

90 

91 

92class JSONSerializable: 

93 """Class to extend in order to obtain json serialization/deserialization. 

94 

95 Children classes have to: 

96 - offer a constructor that must be callable with the named parameter 

97 'from_json_dict' only 

98 - specify the jsondatattr, with a different behavior function of its type: 

99 - str: name of their instance attribute (dict) containing the data to 

100 serialize 

101 - list | tuple | set: name of the instance attributes to serialize. The json 

102 will have each attr name as key, and their content as values 

103 """ 

104 

105 def __init__(self, jsondataattr: str | list[str] | tuple[str] | set[str]): 

106 if type(jsondataattr) not in (str, list, tuple, set): 

107 raise HermesInvalidJSONDataattrTypeError( 

108 f"Invalid jsondataattr type '{type(jsondataattr)}'." 

109 " It must be one of the following types: [str, list, tuple, set]" 

110 ) 

111 self._jsondataattr: str | list[str] | tuple[str] | set[str] = jsondataattr 

112 """Name of instance attribute containing the data to serialize, with a 

113 different behavior function of its type: 

114 - str: name of their instance attribute (dict) containing the data to serialize 

115 - list | tuple | set: name of the instance attributes to serialize. The json 

116 will have each attr name as key, and their content as values 

117 """ 

118 

119 def _get_jsondict(self) -> dict[str, Any]: 

120 if type(self._jsondataattr) is str: 

121 return getattr(self, self._jsondataattr) 

122 elif type(self._jsondataattr) in (list, tuple, set): 

123 return {attr: getattr(self, attr) for attr in self._jsondataattr} 

124 else: 

125 raise HermesInvalidJSONDataattrTypeError( 

126 f"Invalid _jsondataattr type '{type(self._jsondataattr)}'." 

127 " It must be one of the following types: [str, list, tuple, set]" 

128 ) 

129 

130 def to_json(self, forCacheFile=False) -> str: 

131 if forCacheFile: 

132 data = { 

133 "__HERMES_VERSION__": HERMES_VERSION, 

134 "content": self._get_jsondict(), 

135 } 

136 else: 

137 data = self._get_jsondict() 

138 

139 try: 

140 if not isinstance(data, dict): 

141 data = sorted(data) 

142 except TypeError: 

143 __hermes__.logger.warning( 

144 f"Unsortable type {type(self)} exported as JSON." 

145 " You should consider to set is sortable" 

146 ) 

147 return json.dumps(data, cls=JSONEncoder, indent=4) 

148 

149 @classmethod 

150 def __migrateData( 

151 cls: type[AnyJSONSerializable], 

152 from_ver: str, 

153 to_ver: str, 

154 jsondict: Any | dict[Any, Any], 

155 ) -> Any | dict[Any, Any]: 

156 try: 

157 start = HERMES_VERSIONS.index(from_ver) 

158 except ValueError: 

159 errmsg = f"Previous version {from_ver} not found in known HERMES_VERSIONS" 

160 __hermes__.logger.critical(errmsg) 

161 raise HermesInvalidVersionError(errmsg) 

162 

163 try: 

164 stop = HERMES_VERSIONS.index(to_ver, start) 

165 except ValueError: 

166 errmsg = f"Current version {to_ver} not found in known HERMES_VERSIONS" 

167 __hermes__.logger.critical(errmsg) 

168 raise HermesInvalidVersionError(errmsg) 

169 

170 for idx in range(start, stop): 

171 from_v, to_v = HERMES_VERSIONS[idx : idx + 2] 

172 methodName = ( 

173 (f"migrate_from_v{from_v}_to_v{to_v}") 

174 .replace(".", "_") 

175 .replace("-", "_") 

176 ) 

177 method = getattr(cls, methodName, None) 

178 if not callable(method): 

179 # __hermes__.logger.info( 

180 # f"Calling '{methodName}()':" 

181 # f" method '{methodName}()' doesn't exists" 

182 # ) 

183 continue 

184 __hermes__.logger.info( 

185 f"About to migrate {cls.__name__} cache file format from " 

186 f"v{from_ver} to v{to_ver} : calling '{methodName}()" 

187 ) 

188 jsondict = method(jsondict) 

189 

190 return jsondict 

191 

192 @classmethod 

193 def from_json( 

194 cls: type[AnyJSONSerializable], 

195 jsondata: str | dict[Any, Any], 

196 **kwargs: None | Any, 

197 ) -> AnyJSONSerializable: 

198 if type(jsondata) is str: 

199 try: 

200 jsondict = json.loads(jsondata, object_hook=cls._json_parser) 

201 except json.decoder.JSONDecodeError as e: 

202 raise HermesInvalidJSONError(str(e)) 

203 elif isinstance(jsondata, dict): 

204 jsondict = jsondata 

205 else: 

206 raise HermesInvalidJSONDataError( 

207 f"The 'jsondata' arg must be a str or a dict." 

208 f" Here we have '{type(jsondata)}'" 

209 ) 

210 

211 if ( 

212 type(jsondict) is dict 

213 and len(jsondict) == 2 

214 and jsondict.keys() == set(["__HERMES_VERSION__", "content"]) 

215 ): 

216 version = jsondict["__HERMES_VERSION__"] 

217 jsondict = jsondict["content"] 

218 else: 

219 version = HERMES_VERSION 

220 

221 if version != HERMES_VERSION: 

222 jsondict = cls.__migrateData( 

223 from_ver=version, to_ver=HERMES_VERSION, jsondict=jsondict 

224 ) 

225 

226 return cls(from_json_dict=jsondict, **kwargs) 

227 

228 @classmethod 

229 def _json_parser(cls: type[AnyJSONSerializable], value: Any) -> Any: 

230 if isinstance(value, dict): 

231 for k, v in value.items(): 

232 value[k] = cls._json_parser(v) 

233 elif isinstance(value, list): 

234 for index, row in enumerate(value): 

235 value[index] = cls._json_parser(row) 

236 elif isinstance(value, str) and value: 

237 # HermesDatetime 

238 if re.fullmatch( 

239 r"HermesDatetime\(\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}Z\)", value 

240 ): 

241 # String have to match internal isoformat to be converted to datetime: 

242 # "HermesDatetime(yyyy-mm-ddThh:mm:ssZ)" 

243 try: 

244 # Ignore internal isformat container and trailing "Z": 

245 # handle timezone could create a lot of troubles 

246 value = datetime.fromisoformat(value[15:-2]) 

247 except ValueError: 

248 pass 

249 

250 # HermesBytes 

251 elif re.fullmatch(r"HermesBytes\([^)]*\)", value): 

252 try: 

253 value = base64.b64decode(value[12:-1].encode("ascii")) 

254 except Exception: 

255 pass 

256 

257 return value 

258 

259 

260class LocalCache(JSONSerializable): 

261 """Base class to manage local cache file, extending JSONSerializable objects. 

262 This class offer file management, compression, rotation, and the ability to create 

263 instance from cache file or save current instance to cache file. 

264 """ 

265 

266 _settingsbyappname: dict[str, Any] = {} 

267 """Class settings (below attributes) stored by appname to be thread safe 

268 (only necessary for functional tests)""" 

269 

270 _extensions: dict[bool, str] = {True: ".json.gz", False: ".json"} 

271 """Possible cache files extensions according to LocalCache._compressCache() value""" 

272 

273 @staticmethod 

274 def _backupCount() -> int: 

275 """Number of backup files to retain""" 

276 if __hermes__.appname not in LocalCache._settingsbyappname: 

277 raise HermesLocalCacheNotSetupError( 

278 "LocalCache.setup() has never be called : unable to use the LocalCache" 

279 ) 

280 return LocalCache._settingsbyappname[__hermes__.appname]["_backupCount"] 

281 

282 @staticmethod 

283 def _cachedir() -> str: 

284 """Directory where cache file(s) will be stored""" 

285 if __hermes__.appname not in LocalCache._settingsbyappname: 

286 raise HermesLocalCacheNotSetupError( 

287 "LocalCache.setup() has never be called : unable to use the LocalCache" 

288 ) 

289 return LocalCache._settingsbyappname[__hermes__.appname]["_cachedir"] 

290 

291 @staticmethod 

292 def _compressCache() -> bool: 

293 """Boolean indicating if cache files must be gzipped or store as plain text""" 

294 if __hermes__.appname not in LocalCache._settingsbyappname: 

295 raise HermesLocalCacheNotSetupError( 

296 "LocalCache.setup() has never be called : unable to use the LocalCache" 

297 ) 

298 return LocalCache._settingsbyappname[__hermes__.appname]["_compressCache"] 

299 

300 @staticmethod 

301 def _extension() -> str: 

302 """Default cache files extension according to LocalCache._compressCache() 

303 value""" 

304 if __hermes__.appname not in LocalCache._settingsbyappname: 

305 raise HermesLocalCacheNotSetupError( 

306 "LocalCache.setup() has never be called : unable to use the LocalCache" 

307 ) 

308 return LocalCache._extensions[ 

309 LocalCache._settingsbyappname[__hermes__.appname]["_compressCache"] 

310 ] 

311 

312 @staticmethod 

313 def _umask() -> int: 

314 """Umask currently set""" 

315 if __hermes__.appname not in LocalCache._settingsbyappname: 

316 raise HermesLocalCacheNotSetupError( 

317 "LocalCache.setup() has never be called : unable to use the LocalCache" 

318 ) 

319 return LocalCache._settingsbyappname[__hermes__.appname]["_umask"] 

320 

321 @staticmethod 

322 def setup(config: "HermesConfig"): 

323 LocalCache._settingsbyappname[__hermes__.appname] = { 

324 "_backupCount": config["hermes"]["cache"]["backup_count"], 

325 "_cachedir": config["hermes"]["cache"]["dirpath"], 

326 "_compressCache": config["hermes"]["cache"]["enable_compression"], 

327 "_extension": LocalCache._extensions[ 

328 config["hermes"]["cache"]["enable_compression"] 

329 ], 

330 "_umask": config["hermes"]["umask"], 

331 } 

332 

333 def __init__( 

334 self, 

335 jsondataattr: str | list[str] | tuple[str] | set[str], 

336 cachefilename: str | None = None, 

337 dontManageCacheDir: bool = False, 

338 ): 

339 super().__init__(jsondataattr) 

340 self.setCacheFilename(cachefilename) 

341 

342 if not dontManageCacheDir: 

343 if not os.path.exists(LocalCache._cachedir()): 

344 __hermes__.logger.info( 

345 f"Local cache dir '{LocalCache._cachedir()}' doesn't exists:" 

346 " create it" 

347 ) 

348 try: 

349 os.makedirs(LocalCache._cachedir(), 0o777 & ~LocalCache._umask()) 

350 except Exception as e: 

351 __hermes__.logger.fatal( 

352 f"Unable to create local cache dir '{LocalCache._cachedir()}':" 

353 f" {str(e)}" 

354 ) 

355 raise 

356 

357 if not os.path.isdir(LocalCache._cachedir()): 

358 err = ( 

359 f"Local cache dir '{LocalCache._cachedir()}'" 

360 " exists and is not a directory" 

361 ) 

362 __hermes__.logger.fatal(err) 

363 raise HermesInvalidCacheDirError(err) 

364 

365 if not os.access(LocalCache._cachedir(), os.W_OK): 

366 err = ( 

367 f"Local cache dir '{LocalCache._cachedir()}'" 

368 " exists but is not writeable" 

369 ) 

370 __hermes__.logger.fatal(err) 

371 raise HermesInvalidCacheDirError(err) 

372 

373 def savecachefile( 

374 self, cacheFilename: str | None = None, dontKeepBackup: bool = False 

375 ): 

376 if cacheFilename is not None: 

377 self.setCacheFilename(cacheFilename) 

378 

379 if self._localCache_filename is None: 

380 raise HermesUnspecifiedCacheFilename( 

381 "Unable to save cache file without having specified the cacheFilename" 

382 " with setCacheFilename()" 

383 ) 

384 

385 # Generate content before everything else to avoid cache corruption in case of 

386 # failure 

387 content = self.to_json(forCacheFile=True) 

388 

389 found, filepath, ext = self._getExistingFilePath(self._localCache_filename) 

390 if not found: 

391 oldcontent = "" 

392 else: 

393 # Retrieve previous content 

394 with self._open(filepath, "rt") as f: 

395 oldcontent = f.read() 

396 

397 # Save only if content has changed 

398 if content != oldcontent: 

399 # Use a temp file to ensure new data is written before rotating old files 

400 tmpfilepath: str 

401 destpath: str = ( 

402 f"{LocalCache._cachedir()}/{self._localCache_filename}" 

403 f"{LocalCache._extension()}" 

404 ) 

405 

406 with NamedTemporaryFile( 

407 dir=LocalCache._cachedir(), 

408 suffix=LocalCache._extension(), 

409 mode="wt", 

410 delete=False, 

411 ) as tmp: 

412 # Save full path, and close file to allow to open it with self._open 

413 # that could allow transparent gzip compression 

414 tmpfilepath = tmp.name 

415 

416 with self._open(tmpfilepath, "wt") as f: 

417 f.write(content) 

418 os.chmod(f.name, 0o666 & ~LocalCache._umask()) 

419 

420 if not dontKeepBackup: 

421 self._rotatecachefile(self._localCache_filename) 

422 os.rename(tmpfilepath, destpath) 

423 

424 def setCacheFilename(self, filename: str | None): 

425 self._localCache_filename = filename 

426 

427 @classmethod 

428 def loadcachefile( 

429 cls: type[AnyLocalCache], filename: str, **kwargs: None | Any 

430 ) -> AnyLocalCache: 

431 found, filepath, ext = cls._getExistingFilePath(filename) 

432 if not found: 

433 __hermes__.logger.info( 

434 f"Specified cache file '{filepath}' doesn't exists," 

435 " returning empty data" 

436 ) 

437 jsondata = "{}" 

438 else: 

439 with cls._open(filepath, "rt") as f: 

440 jsondata = f.read() 

441 

442 ret = cls.from_json(jsondata, **kwargs) 

443 ret.setCacheFilename(filename) 

444 return ret 

445 

446 @classmethod 

447 def _getExistingFilePath( 

448 cls: type[AnyLocalCache], filename: str 

449 ) -> tuple[bool, str, str | None]: 

450 """Check if specified filename exists with default extension, or with other 

451 This allow the user to change the "enable_compression" setting without breaking 

452 the current cache. 

453 

454 Returns a tuple (found, filepath, extension) 

455 - found: boolean indicating if filepath was found 

456 - filepath: if found: str indicating filepath, otherwise filepath with 

457 default extension (may be useful for logging) 

458 - extension: if found: str containing the extension of the filepath found, 

459 None otherwise 

460 """ 

461 for extension in ( 

462 # Extension that should be used according to Config 

463 LocalCache._extensions[LocalCache._compressCache()], 

464 # Extension that could be used if settings has changed 

465 LocalCache._extensions[not LocalCache._compressCache()], 

466 ): 

467 filepath = f"{LocalCache._cachedir()}/{filename}{extension}" 

468 if os.path.exists(filepath): 

469 return (True, filepath, extension) 

470 

471 # Not found 

472 return ( 

473 False, 

474 f"{LocalCache._cachedir()}/{filename}{LocalCache._extension()}", 

475 None, 

476 ) 

477 

478 @classmethod 

479 def _open(cls: type[AnyLocalCache], path: str, mode: str = "r") -> "IO": 

480 gzipped = path.endswith(LocalCache._extensions[True]) 

481 _open: Callable[[str, str], "IO"] = gzip.open if gzipped else open 

482 return _open(path, mode) 

483 

484 @classmethod 

485 def _rotatecachefile(cls: type[AnyLocalCache], filename: str): 

486 idxlen = 6 

487 for i in range(LocalCache._backupCount(), 0, -1): 

488 oldsuffix = f".{str(i - 1).zfill(idxlen)}" if i > 1 else "" 

489 found, old, ext = cls._getExistingFilePath(f"{filename}{oldsuffix}") 

490 if found: 

491 new = f"{LocalCache._cachedir()}/{filename}.{str(i).zfill(idxlen)}{ext}" 

492 os.rename(old, new) 

493 

494 @classmethod 

495 def deleteAllCacheFiles(cls: type[AnyLocalCache], filename: str): 

496 """Delete cache files and its backups with specified filename""" 

497 # Remove main cache file 

498 found, path, ext = cls._getExistingFilePath(f"{filename}") 

499 if found: 

500 __hermes__.logger.debug(f"Deleting '{path}'") 

501 os.remove(path) 

502 

503 # Remove backup cache files 

504 idxlen = 6 

505 for i in range(LocalCache._backupCount(), 0, -1): 

506 suffix = f".{str(i - 1).zfill(idxlen)}" if i > 1 else "" 

507 found, path, ext = cls._getExistingFilePath(f"{filename}{suffix}") 

508 if found: 

509 __hermes__.logger.debug(f"Deleting '{path}'") 

510 os.remove(path)