Coverage for plugins/attributes/crypto_RSA_OAEP/crypto_RSA_OAEP.py: 52%
42 statements
« prev ^ index » next coverage.py v7.10.1, created at 2025-07-28 07:25 +0000
« prev ^ index » next coverage.py v7.10.1, created at 2025-07-28 07:25 +0000
1#!/usr/bin/env python3
2# -*- coding: utf-8 -*-
4# Hermes : Change Data Capture (CDC) tool from any source(s) to any target
5# Copyright (C) 2023, 2024 INSA Strasbourg
6#
7# This file is part of Hermes.
8#
9# Hermes is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# Hermes is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with Hermes. If not, see <https://www.gnu.org/licenses/>.
23from typing import Any, Type
25from Cryptodome.PublicKey import RSA
26from Cryptodome.Cipher import PKCS1_OAEP
27from Cryptodome.Cipher.PKCS1_OAEP import PKCS1OAEP_Cipher
28from Cryptodome.Hash import (
29 SHA224,
30 SHA256,
31 SHA384,
32 SHA512,
33 SHA3_224,
34 SHA3_256,
35 SHA3_384,
36 SHA3_512,
37)
38from jinja2 import Undefined
40import base64
42from lib.plugins import AbstractAttributePlugin
45HERMES_PLUGIN_CLASSNAME: str | None = "Attribute_Crypto_RSA_OAEP_Plugin"
46"""The plugin class name defined in this module file"""
49class Attribute_Crypto_RSA_OAEP_Plugin(AbstractAttributePlugin):
50 """Plugin to encrypt/decrypt strings with asymmetric RSA keys, using PKCS#1
51 OAEP,an asymmetric cipher based on RSA and the OAEP padding"""
53 __hashclasses: dict[str, Type[Any]] = {
54 "SHA224": SHA224,
55 "SHA256": SHA256,
56 "SHA384": SHA384,
57 "SHA512": SHA512,
58 "SHA3_224": SHA3_224,
59 "SHA3_256": SHA3_256,
60 "SHA3_384": SHA3_384,
61 "SHA3_512": SHA3_512,
62 }
64 def __init__(self, settings: dict[str, any]) -> None:
65 """Instantiate new plugin and store a copy of its settings dict in
66 self._settings"""
67 super().__init__(settings)
69 self.__keys: dict[str, tuple[str, PKCS1OAEP_Cipher]] = {}
70 for key, keysettings in settings["keys"].items():
71 rsa = RSA.importKey(keysettings["rsa_key"])
72 operation = "decrypt" if rsa.has_private() else "encrypt"
73 self.__keys[key] = (
74 operation,
75 PKCS1_OAEP.new(
76 rsa,
77 hashAlgo=self.__hashclasses[keysettings["hash"]],
78 ),
79 )
81 def filter(self, value: bytes | str | None | Undefined, keyname: str) -> str:
82 """Call the plugin with specified value and keyname, and returns the result.
83 The plugin will determine if it's an encryption or a decryption operation upon
84 the key type: decryption for private keys, and encryption for public keys.
86 Encryption: value is either a byte-array or a string, and result is a base64
87 encoded byte-array.
89 Decryption: value is either a byte-array or a base64 encoded byte-array, and
90 result is a string.
91 """
92 if keyname not in self.__keys:
93 raise IndexError(
94 f"Specified {keyname=} doesn't exist in"
95 " hermes.plugins.attributes.RSA_OAEP_crypto.settings in config file"
96 )
98 if value is None:
99 return Undefined(hint="No value specified")
101 if isinstance(value, Undefined):
102 return value
104 if type(value) not in (bytes, str):
105 raise TypeError(
106 f"Invalid value type {type(value)=}. Valid types are (bytes, str)"
107 )
109 operation, cipherinst = self.__keys[keyname]
111 if operation == "decrypt":
112 return self.decrypt(value, cipherinst)
113 else:
114 return self.encrypt(value, cipherinst)
116 def encrypt(self, value: bytes | str, cipherinst: PKCS1OAEP_Cipher) -> str:
117 """Encrypt the specified value, and returns the resulting byte-array
118 encoded in a base64 string"""
119 if type(value) is bytes:
120 # Raw binary encoded byte array
121 value_bytes = value
122 else:
123 # UTF8 string
124 value_bytes = value.encode("utf8")
126 return base64.b64encode(cipherinst.encrypt(value_bytes)).decode("ascii")
128 def decrypt(self, value: bytes | str, cipherinst: PKCS1OAEP_Cipher) -> str:
129 """Decrypt the specified value, and returns the result as a string"""
130 if type(value) is bytes:
131 # Raw binary encoded byte array
132 value_bytes = value
133 else:
134 # Base64 encoded byte array
135 value_bytes = base64.b64decode(value.encode("ascii"))
137 return cipherinst.decrypt(value_bytes).decode("utf8")