Coverage for plugins / attributes / crypto_RSA_OAEP / crypto_RSA_OAEP.py: 52%
42 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-16 15:11 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-16 15:11 +0000
1#!/usr/bin/env python3
2# -*- coding: utf-8 -*-
4# Hermes : Change Data Capture (CDC) tool from any source(s) to any target
5# Copyright (C) 2023 INSA Strasbourg
6#
7# This file is part of Hermes.
8#
9# Hermes is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# Hermes is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with Hermes. If not, see <https://www.gnu.org/licenses/>.
23from typing import Any, Type
25from Cryptodome.PublicKey import RSA
26from Cryptodome.Cipher import PKCS1_OAEP
27from Cryptodome.Cipher.PKCS1_OAEP import PKCS1OAEP_Cipher
28from Cryptodome.Hash import (
29 SHA224,
30 SHA256,
31 SHA384,
32 SHA512,
33 SHA3_224,
34 SHA3_256,
35 SHA3_384,
36 SHA3_512,
37)
38from jinja2 import Undefined
40import base64
42from lib.plugins import AbstractAttributePlugin
44HERMES_PLUGIN_CLASSNAME: str | None = "Attribute_Crypto_RSA_OAEP_Plugin"
45"""The plugin class name defined in this module file"""
48class Attribute_Crypto_RSA_OAEP_Plugin(AbstractAttributePlugin):
49 """Plugin to encrypt/decrypt strings with asymmetric RSA keys, using PKCS#1
50 OAEP,an asymmetric cipher based on RSA and the OAEP padding"""
52 __hashclasses: dict[str, Type[Any]] = {
53 "SHA224": SHA224,
54 "SHA256": SHA256,
55 "SHA384": SHA384,
56 "SHA512": SHA512,
57 "SHA3_224": SHA3_224,
58 "SHA3_256": SHA3_256,
59 "SHA3_384": SHA3_384,
60 "SHA3_512": SHA3_512,
61 }
63 def __init__(self, settings: dict[str, any]) -> None:
64 """Instantiate new plugin and store a copy of its settings dict in
65 self._settings"""
66 super().__init__(settings)
68 self.__keys: dict[str, tuple[str, PKCS1OAEP_Cipher]] = {}
69 for key, keysettings in settings["keys"].items():
70 rsa = RSA.importKey(keysettings["rsa_key"])
71 operation = "decrypt" if rsa.has_private() else "encrypt"
72 self.__keys[key] = (
73 operation,
74 PKCS1_OAEP.new(
75 rsa,
76 hashAlgo=self.__hashclasses[keysettings["hash"]],
77 ),
78 )
80 def filter(self, value: bytes | str | None | Undefined, keyname: str) -> str:
81 """Call the plugin with specified value and keyname, and returns the result.
82 The plugin will determine if it's an encryption or a decryption operation upon
83 the key type: decryption for private keys, and encryption for public keys.
85 Encryption: value is either a byte-array or a string, and result is a base64
86 encoded byte-array.
88 Decryption: value is either a byte-array or a base64 encoded byte-array, and
89 result is a string.
90 """
91 if keyname not in self.__keys:
92 raise IndexError(
93 f"Specified {keyname=} doesn't exist in"
94 " hermes.plugins.attributes.RSA_OAEP_crypto.settings in config file"
95 )
97 if value is None:
98 return Undefined(hint="No value specified")
100 if isinstance(value, Undefined):
101 return value
103 if type(value) not in (bytes, str):
104 raise TypeError(
105 f"Invalid value type {type(value)=}. Valid types are (bytes, str)"
106 )
108 operation, cipherinst = self.__keys[keyname]
110 if operation == "decrypt":
111 return self.decrypt(value, cipherinst)
112 else:
113 return self.encrypt(value, cipherinst)
115 def encrypt(self, value: bytes | str, cipherinst: PKCS1OAEP_Cipher) -> str:
116 """Encrypt the specified value, and returns the resulting byte-array
117 encoded in a base64 string"""
118 if type(value) is bytes:
119 # Raw binary encoded byte array
120 value_bytes = value
121 else:
122 # UTF8 string
123 value_bytes = value.encode("utf8")
125 return base64.b64encode(cipherinst.encrypt(value_bytes)).decode("ascii")
127 def decrypt(self, value: bytes | str, cipherinst: PKCS1OAEP_Cipher) -> str:
128 """Decrypt the specified value, and returns the result as a string"""
129 if type(value) is bytes:
130 # Raw binary encoded byte array
131 value_bytes = value
132 else:
133 # Base64 encoded byte array
134 value_bytes = base64.b64decode(value.encode("ascii"))
136 return cipherinst.decrypt(value_bytes).decode("utf8")