Coverage for lib/datamodel/serialization.py: 83%
233 statements
« prev ^ index » next coverage.py v7.10.1, created at 2025-07-28 07:25 +0000
« prev ^ index » next coverage.py v7.10.1, created at 2025-07-28 07:25 +0000
1#!/usr/bin/env python3
2# -*- coding: utf-8 -*-
4# Hermes : Change Data Capture (CDC) tool from any source(s) to any target
5# Copyright (C) 2023, 2024 INSA Strasbourg
6#
7# This file is part of Hermes.
8#
9# Hermes is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# Hermes is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with Hermes. If not, see <https://www.gnu.org/licenses/>.
23from typing import TypeVar, Any, TYPE_CHECKING
25if TYPE_CHECKING: # pragma: no cover
26 # Only for type hints, won't import at runtime
27 from typing import Callable, IO
28 from lib.config import HermesConfig
30from lib.version import HERMES_VERSION, HERMES_VERSIONS
31from datetime import datetime
32from tempfile import NamedTemporaryFile
33import base64
34import json
35import os
36import os.path
37import gzip
38import re
40AnyJSONSerializable = TypeVar("AnyJSONSerializable", bound="JSONSerializable")
41AnyLocalCache = TypeVar("AnyLocalCache", bound="LocalCache")
44class HermesInvalidVersionError(Exception):
45 """Raised when one of previous of current HERMES_VERSION is missing from
46 HERMES_VERSIONS"""
49class HermesInvalidJSONError(Exception):
50 """Raised when the json passed to from_json is invalid"""
53class HermesInvalidJSONDataError(Exception):
54 """Raised when the data passed to from_json has invalid type"""
57class HermesInvalidJSONDataattrTypeError(Exception):
58 """Raised when the type of specified jsondataattr is invalid"""
61class HermesInvalidCacheDirError(Exception):
62 """Raised when an CacheDir exist and isn't a directory or isn't writeable"""
65class HermesUnspecifiedCacheFilename(Exception):
66 """Raised when trying to save cache file from an instance where setCacheFilename()
67 has never been called"""
70class HermesLocalCacheNotSetupError(Exception):
71 """Raised when trying to use LocalCache without having called LocalCache.setup()
72 before"""
75class JSONEncoder(json.JSONEncoder):
76 """Helper to serialize specific objects (datetime, JSONSerializable) in JSON"""
78 def default(self, obj: Any) -> Any:
79 # If object to encode is a datetime, convert it to an internal isoformat string
80 if isinstance(obj, datetime):
81 obj_notz = obj.replace(tzinfo=None)
82 return f"HermesDatetime({obj_notz.isoformat(timespec='seconds')}Z)"
83 if isinstance(obj, bytes):
84 return f"HermesBytes({base64.b64encode(obj).decode('ascii')})"
85 if isinstance(obj, JSONSerializable):
86 return obj._get_jsondict()
87 if isinstance(obj, set):
88 return sorted(list(obj))
89 return json.JSONEncoder.default(self, obj)
92class JSONSerializable:
93 """Class to extend in order to obtain json serialization/deserialization.
95 Children classes have to:
96 - offer a constructor that must be callable with the named parameter
97 'from_json_dict' only
98 - specify the jsondatattr, with a different behavior function of its type:
99 - str: name of their instance attribute (dict) containing the data to
100 serialize
101 - list | tuple | set: name of the instance attributes to serialize. The json
102 will have each attr name as key, and their content as values
103 """
105 def __init__(self, jsondataattr: str | list[str] | tuple[str] | set[str]):
106 if type(jsondataattr) not in (str, list, tuple, set):
107 raise HermesInvalidJSONDataattrTypeError(
108 f"Invalid jsondataattr type '{type(jsondataattr)}'."
109 " It must be one of the following types: [str, list, tuple, set]"
110 )
111 self._jsondataattr: str | list[str] | tuple[str] | set[str] = jsondataattr
112 """Name of instance attribute containing the data to serialize, with a
113 different behavior function of its type:
114 - str: name of their instance attribute (dict) containing the data to serialize
115 - list | tuple | set: name of the instance attributes to serialize. The json
116 will have each attr name as key, and their content as values
117 """
119 def _get_jsondict(self) -> dict[str, Any]:
120 if type(self._jsondataattr) is str:
121 return getattr(self, self._jsondataattr)
122 elif type(self._jsondataattr) in (list, tuple, set):
123 return {attr: getattr(self, attr) for attr in self._jsondataattr}
124 else:
125 raise HermesInvalidJSONDataattrTypeError(
126 f"Invalid _jsondataattr type '{type(self._jsondataattr)}'."
127 " It must be one of the following types: [str, list, tuple, set]"
128 )
130 def to_json(self, forCacheFile=False) -> str:
131 if forCacheFile:
132 data = {
133 "__HERMES_VERSION__": HERMES_VERSION,
134 "content": self._get_jsondict(),
135 }
136 else:
137 data = self._get_jsondict()
139 try:
140 if not isinstance(data, dict):
141 data = sorted(data)
142 except TypeError:
143 __hermes__.logger.warning(
144 f"Unsortable type {type(self)} exported as JSON."
145 " You should consider to set is sortable"
146 )
147 return json.dumps(data, cls=JSONEncoder, indent=4)
149 @classmethod
150 def __migrateData(
151 cls: type[AnyJSONSerializable],
152 from_ver: str,
153 to_ver: str,
154 jsondict: Any | dict[Any, Any],
155 ) -> Any | dict[Any, Any]:
156 try:
157 start = HERMES_VERSIONS.index(from_ver)
158 except ValueError:
159 errmsg = f"Previous version {from_ver} not found in known HERMES_VERSIONS"
160 __hermes__.logger.critical(errmsg)
161 raise HermesInvalidVersionError(errmsg)
163 try:
164 stop = HERMES_VERSIONS.index(to_ver, start)
165 except ValueError:
166 errmsg = f"Current version {to_ver} not found in known HERMES_VERSIONS"
167 __hermes__.logger.critical(errmsg)
168 raise HermesInvalidVersionError(errmsg)
170 for idx in range(start, stop):
171 from_v, to_v = HERMES_VERSIONS[idx : idx + 2]
172 methodName = (
173 (f"migrate_from_v{from_v}_to_v{to_v}")
174 .replace(".", "_")
175 .replace("-", "_")
176 )
177 method = getattr(cls, methodName, None)
178 if not callable(method):
179 # __hermes__.logger.info(
180 # f"Calling '{methodName}()':"
181 # f" method '{methodName}()' doesn't exists"
182 # )
183 continue
184 __hermes__.logger.info(
185 f"About to migrate {cls.__name__} cache file format from "
186 f"v{from_ver} to v{to_ver} : calling '{methodName}()"
187 )
188 jsondict = method(jsondict)
190 return jsondict
192 @classmethod
193 def from_json(
194 cls: type[AnyJSONSerializable],
195 jsondata: str | dict[Any, Any],
196 **kwargs: None | Any,
197 ) -> AnyJSONSerializable:
198 if type(jsondata) is str:
199 try:
200 jsondict = json.loads(jsondata, object_hook=cls._json_parser)
201 except json.decoder.JSONDecodeError as e:
202 raise HermesInvalidJSONError(str(e))
203 elif isinstance(jsondata, dict):
204 jsondict = jsondata
205 else:
206 raise HermesInvalidJSONDataError(
207 f"The 'jsondata' arg must be a str or a dict."
208 f" Here we have '{type(jsondata)}'"
209 )
211 if (
212 type(jsondict) is dict
213 and len(jsondict) == 2
214 and jsondict.keys() == set(["__HERMES_VERSION__", "content"])
215 ):
216 version = jsondict["__HERMES_VERSION__"]
217 jsondict = jsondict["content"]
218 else:
219 version = HERMES_VERSION
221 if version != HERMES_VERSION:
222 jsondict = cls.__migrateData(
223 from_ver=version, to_ver=HERMES_VERSION, jsondict=jsondict
224 )
226 return cls(from_json_dict=jsondict, **kwargs)
228 @classmethod
229 def _json_parser(cls: type[AnyJSONSerializable], value: Any) -> Any:
230 if isinstance(value, dict):
231 for k, v in value.items():
232 value[k] = cls._json_parser(v)
233 elif isinstance(value, list):
234 for index, row in enumerate(value):
235 value[index] = cls._json_parser(row)
236 elif isinstance(value, str) and value:
237 # HermesDatetime
238 if re.fullmatch(
239 r"HermesDatetime\(\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}Z\)", value
240 ):
241 # String have to match internal isoformat to be converted to datetime:
242 # "HermesDatetime(yyyy-mm-ddThh:mm:ssZ)"
243 try:
244 # Ignore internal isformat container and trailing "Z":
245 # handle timezone could create a lot of troubles
246 value = datetime.fromisoformat(value[15:-2])
247 except ValueError:
248 pass
250 # HermesBytes
251 elif re.fullmatch(r"HermesBytes\([^)]*\)", value):
252 try:
253 value = base64.b64decode(value[12:-1].encode("ascii"))
254 except Exception:
255 pass
257 return value
260class LocalCache(JSONSerializable):
261 """Base class to manage local cache file, extending JSONSerializable objects.
262 This class offer file management, compression, rotation, and the ability to create
263 instance from cache file or save current instance to cache file.
264 """
266 _settingsbyappname: dict[str, Any] = {}
267 """Class settings (below attributes) stored by appname to be thread safe
268 (only necessary for functional tests)"""
270 _extensions: dict[bool, str] = {True: ".json.gz", False: ".json"}
271 """Possible cache files extensions according to LocalCache._compressCache() value"""
273 @staticmethod
274 def _backupCount() -> int:
275 """Number of backup files to retain"""
276 if __hermes__.appname not in LocalCache._settingsbyappname:
277 raise HermesLocalCacheNotSetupError(
278 "LocalCache.setup() has never be called : unable to use the LocalCache"
279 )
280 return LocalCache._settingsbyappname[__hermes__.appname]["_backupCount"]
282 @staticmethod
283 def _cachedir() -> str:
284 """Directory where cache file(s) will be stored"""
285 if __hermes__.appname not in LocalCache._settingsbyappname:
286 raise HermesLocalCacheNotSetupError(
287 "LocalCache.setup() has never be called : unable to use the LocalCache"
288 )
289 return LocalCache._settingsbyappname[__hermes__.appname]["_cachedir"]
291 @staticmethod
292 def _compressCache() -> bool:
293 """Boolean indicating if cache files must be gzipped or store as plain text"""
294 if __hermes__.appname not in LocalCache._settingsbyappname:
295 raise HermesLocalCacheNotSetupError(
296 "LocalCache.setup() has never be called : unable to use the LocalCache"
297 )
298 return LocalCache._settingsbyappname[__hermes__.appname]["_compressCache"]
300 @staticmethod
301 def _extension() -> str:
302 """Default cache files extension according to LocalCache._compressCache()
303 value"""
304 if __hermes__.appname not in LocalCache._settingsbyappname:
305 raise HermesLocalCacheNotSetupError(
306 "LocalCache.setup() has never be called : unable to use the LocalCache"
307 )
308 return LocalCache._extensions[
309 LocalCache._settingsbyappname[__hermes__.appname]["_compressCache"]
310 ]
312 @staticmethod
313 def _umask() -> int:
314 """Umask currently set"""
315 if __hermes__.appname not in LocalCache._settingsbyappname:
316 raise HermesLocalCacheNotSetupError(
317 "LocalCache.setup() has never be called : unable to use the LocalCache"
318 )
319 return LocalCache._settingsbyappname[__hermes__.appname]["_umask"]
321 @staticmethod
322 def setup(config: "HermesConfig"):
323 LocalCache._settingsbyappname[__hermes__.appname] = {
324 "_backupCount": config["hermes"]["cache"]["backup_count"],
325 "_cachedir": config["hermes"]["cache"]["dirpath"],
326 "_compressCache": config["hermes"]["cache"]["enable_compression"],
327 "_extension": LocalCache._extensions[
328 config["hermes"]["cache"]["enable_compression"]
329 ],
330 "_umask": config["hermes"]["umask"],
331 }
333 def __init__(
334 self,
335 jsondataattr: str | list[str] | tuple[str] | set[str],
336 cachefilename: str | None = None,
337 dontManageCacheDir: bool = False,
338 ):
339 super().__init__(jsondataattr)
340 self.setCacheFilename(cachefilename)
342 if not dontManageCacheDir:
343 if not os.path.exists(LocalCache._cachedir()):
344 __hermes__.logger.info(
345 f"Local cache dir '{LocalCache._cachedir()}' doesn't exists:"
346 " create it"
347 )
348 try:
349 os.makedirs(LocalCache._cachedir(), 0o777 & ~LocalCache._umask())
350 except Exception as e:
351 __hermes__.logger.fatal(
352 f"Unable to create local cache dir '{LocalCache._cachedir()}':"
353 f" {str(e)}"
354 )
355 raise
357 if not os.path.isdir(LocalCache._cachedir()):
358 err = (
359 f"Local cache dir '{LocalCache._cachedir()}'"
360 " exists and is not a directory"
361 )
362 __hermes__.logger.fatal(err)
363 raise HermesInvalidCacheDirError(err)
365 if not os.access(LocalCache._cachedir(), os.W_OK):
366 err = (
367 f"Local cache dir '{LocalCache._cachedir()}'"
368 " exists but is not writeable"
369 )
370 __hermes__.logger.fatal(err)
371 raise HermesInvalidCacheDirError(err)
373 def savecachefile(
374 self, cacheFilename: str | None = None, dontKeepBackup: bool = False
375 ):
376 if cacheFilename is not None:
377 self.setCacheFilename(cacheFilename)
379 if self._localCache_filename is None:
380 raise HermesUnspecifiedCacheFilename(
381 "Unable to save cache file without having specified the cacheFilename"
382 " with setCacheFilename()"
383 )
385 # Generate content before everything else to avoid cache corruption in case of
386 # failure
387 content = self.to_json(forCacheFile=True)
389 found, filepath, ext = self._getExistingFilePath(self._localCache_filename)
390 if not found:
391 oldcontent = ""
392 else:
393 # Retrieve previous content
394 with self._open(filepath, "rt") as f:
395 oldcontent = f.read()
397 # Save only if content has changed
398 if content != oldcontent:
399 # Use a temp file to ensure new data is written before rotating old files
400 tmpfilepath: str
401 destpath: str = (
402 f"{LocalCache._cachedir()}/{self._localCache_filename}"
403 f"{LocalCache._extension()}"
404 )
406 with NamedTemporaryFile(
407 dir=LocalCache._cachedir(),
408 suffix=LocalCache._extension(),
409 mode="wt",
410 delete=False,
411 ) as tmp:
412 # Save full path, and close file to allow to open it with self._open
413 # that could allow transparent gzip compression
414 tmpfilepath = tmp.name
416 with self._open(tmpfilepath, "wt") as f:
417 f.write(content)
418 os.chmod(f.name, 0o666 & ~LocalCache._umask())
420 if not dontKeepBackup:
421 self._rotatecachefile(self._localCache_filename)
422 os.rename(tmpfilepath, destpath)
424 def setCacheFilename(self, filename: str | None):
425 self._localCache_filename = filename
427 @classmethod
428 def loadcachefile(
429 cls: type[AnyLocalCache], filename: str, **kwargs: None | Any
430 ) -> AnyLocalCache:
431 found, filepath, ext = cls._getExistingFilePath(filename)
432 if not found:
433 __hermes__.logger.info(
434 f"Specified cache file '{filepath}' doesn't exists,"
435 " returning empty data"
436 )
437 jsondata = "{}"
438 else:
439 with cls._open(filepath, "rt") as f:
440 jsondata = f.read()
442 ret = cls.from_json(jsondata, **kwargs)
443 ret.setCacheFilename(filename)
444 return ret
446 @classmethod
447 def _getExistingFilePath(
448 cls: type[AnyLocalCache], filename: str
449 ) -> tuple[bool, str, str | None]:
450 """Check if specified filename exists with default extension, or with other
451 This allow the user to change the "enable_compression" setting without breaking
452 the current cache.
454 Returns a tuple (found, filepath, extension)
455 - found: boolean indicating if filepath was found
456 - filepath: if found: str indicating filepath, otherwise filepath with
457 default extension (may be useful for logging)
458 - extension: if found: str containing the extension of the filepath found,
459 None otherwise
460 """
461 for extension in (
462 # Extension that should be used according to Config
463 LocalCache._extensions[LocalCache._compressCache()],
464 # Extension that could be used if settings has changed
465 LocalCache._extensions[not LocalCache._compressCache()],
466 ):
467 filepath = f"{LocalCache._cachedir()}/{filename}{extension}"
468 if os.path.exists(filepath):
469 return (True, filepath, extension)
471 # Not found
472 return (
473 False,
474 f"{LocalCache._cachedir()}/{filename}{LocalCache._extension()}",
475 None,
476 )
478 @classmethod
479 def _open(cls: type[AnyLocalCache], path: str, mode: str = "r") -> "IO":
480 gzipped = path.endswith(LocalCache._extensions[True])
481 _open: Callable[[str, str], "IO"] = gzip.open if gzipped else open
482 return _open(path, mode)
484 @classmethod
485 def _rotatecachefile(cls: type[AnyLocalCache], filename: str):
486 idxlen = 6
487 for i in range(LocalCache._backupCount(), 0, -1):
488 oldsuffix = f".{str(i - 1).zfill(idxlen)}" if i > 1 else ""
489 found, old, ext = cls._getExistingFilePath(f"{filename}{oldsuffix}")
490 if found:
491 new = f"{LocalCache._cachedir()}/{filename}.{str(i).zfill(idxlen)}{ext}"
492 os.rename(old, new)
494 @classmethod
495 def deleteAllCacheFiles(cls: type[AnyLocalCache], filename: str):
496 """Delete cache files and its backups with specified filename"""
497 # Remove main cache file
498 found, path, ext = cls._getExistingFilePath(f"{filename}")
499 if found:
500 __hermes__.logger.debug(f"Deleting '{path}'")
501 os.remove(path)
503 # Remove backup cache files
504 idxlen = 6
505 for i in range(LocalCache._backupCount(), 0, -1):
506 suffix = f".{str(i - 1).zfill(idxlen)}" if i > 1 else ""
507 found, path, ext = cls._getExistingFilePath(f"{filename}{suffix}")
508 if found:
509 __hermes__.logger.debug(f"Deleting '{path}'")
510 os.remove(path)