Coverage for plugins/attributes/crypto_RSA_OAEP/crypto_RSA_OAEP.py: 52%

42 statements  

« prev     ^ index     » next       coverage.py v7.10.1, created at 2025-07-28 07:24 +0000

1#!/usr/bin/env python3 

2# -*- coding: utf-8 -*- 

3 

4# Hermes : Change Data Capture (CDC) tool from any source(s) to any target 

5# Copyright (C) 2023, 2024 INSA Strasbourg 

6# 

7# This file is part of Hermes. 

8# 

9# Hermes is free software: you can redistribute it and/or modify 

10# it under the terms of the GNU General Public License as published by 

11# the Free Software Foundation, either version 3 of the License, or 

12# (at your option) any later version. 

13# 

14# Hermes is distributed in the hope that it will be useful, 

15# but WITHOUT ANY WARRANTY; without even the implied warranty of 

16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

17# GNU General Public License for more details. 

18# 

19# You should have received a copy of the GNU General Public License 

20# along with Hermes. If not, see <https://www.gnu.org/licenses/>. 

21 

22 

23from typing import Any, Type 

24 

25from Cryptodome.PublicKey import RSA 

26from Cryptodome.Cipher import PKCS1_OAEP 

27from Cryptodome.Cipher.PKCS1_OAEP import PKCS1OAEP_Cipher 

28from Cryptodome.Hash import ( 

29 SHA224, 

30 SHA256, 

31 SHA384, 

32 SHA512, 

33 SHA3_224, 

34 SHA3_256, 

35 SHA3_384, 

36 SHA3_512, 

37) 

38from jinja2 import Undefined 

39 

40import base64 

41 

42from lib.plugins import AbstractAttributePlugin 

43 

44 

45HERMES_PLUGIN_CLASSNAME: str | None = "Attribute_Crypto_RSA_OAEP_Plugin" 

46"""The plugin class name defined in this module file""" 

47 

48 

49class Attribute_Crypto_RSA_OAEP_Plugin(AbstractAttributePlugin): 

50 """Plugin to encrypt/decrypt strings with asymmetric RSA keys, using PKCS#1 

51 OAEP,an asymmetric cipher based on RSA and the OAEP padding""" 

52 

53 __hashclasses: dict[str, Type[Any]] = { 

54 "SHA224": SHA224, 

55 "SHA256": SHA256, 

56 "SHA384": SHA384, 

57 "SHA512": SHA512, 

58 "SHA3_224": SHA3_224, 

59 "SHA3_256": SHA3_256, 

60 "SHA3_384": SHA3_384, 

61 "SHA3_512": SHA3_512, 

62 } 

63 

64 def __init__(self, settings: dict[str, any]) -> None: 

65 """Instantiate new plugin and store a copy of its settings dict in 

66 self._settings""" 

67 super().__init__(settings) 

68 

69 self.__keys: dict[str, tuple[str, PKCS1OAEP_Cipher]] = {} 

70 for key, keysettings in settings["keys"].items(): 

71 rsa = RSA.importKey(keysettings["rsa_key"]) 

72 operation = "decrypt" if rsa.has_private() else "encrypt" 

73 self.__keys[key] = ( 

74 operation, 

75 PKCS1_OAEP.new( 

76 rsa, 

77 hashAlgo=self.__hashclasses[keysettings["hash"]], 

78 ), 

79 ) 

80 

81 def filter(self, value: bytes | str | None | Undefined, keyname: str) -> str: 

82 """Call the plugin with specified value and keyname, and returns the result. 

83 The plugin will determine if it's an encryption or a decryption operation upon 

84 the key type: decryption for private keys, and encryption for public keys. 

85 

86 Encryption: value is either a byte-array or a string, and result is a base64 

87 encoded byte-array. 

88 

89 Decryption: value is either a byte-array or a base64 encoded byte-array, and 

90 result is a string. 

91 """ 

92 if keyname not in self.__keys: 

93 raise IndexError( 

94 f"Specified {keyname=} doesn't exist in" 

95 " hermes.plugins.attributes.RSA_OAEP_crypto.settings in config file" 

96 ) 

97 

98 if value is None: 

99 return Undefined(hint="No value specified") 

100 

101 if isinstance(value, Undefined): 

102 return value 

103 

104 if type(value) not in (bytes, str): 

105 raise TypeError( 

106 f"Invalid value type {type(value)=}. Valid types are (bytes, str)" 

107 ) 

108 

109 operation, cipherinst = self.__keys[keyname] 

110 

111 if operation == "decrypt": 

112 return self.decrypt(value, cipherinst) 

113 else: 

114 return self.encrypt(value, cipherinst) 

115 

116 def encrypt(self, value: bytes | str, cipherinst: PKCS1OAEP_Cipher) -> str: 

117 """Encrypt the specified value, and returns the resulting byte-array 

118 encoded in a base64 string""" 

119 if type(value) is bytes: 

120 # Raw binary encoded byte array 

121 value_bytes = value 

122 else: 

123 # UTF8 string 

124 value_bytes = value.encode("utf8") 

125 

126 return base64.b64encode(cipherinst.encrypt(value_bytes)).decode("ascii") 

127 

128 def decrypt(self, value: bytes | str, cipherinst: PKCS1OAEP_Cipher) -> str: 

129 """Decrypt the specified value, and returns the result as a string""" 

130 if type(value) is bytes: 

131 # Raw binary encoded byte array 

132 value_bytes = value 

133 else: 

134 # Base64 encoded byte array 

135 value_bytes = base64.b64decode(value.encode("ascii")) 

136 

137 return cipherinst.decrypt(value_bytes).decode("utf8")