Coverage for plugins / attributes / crypto_RSA_OAEP / crypto_RSA_OAEP.py: 52%

42 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-03-16 15:16 +0000

1#!/usr/bin/env python3 

2# -*- coding: utf-8 -*- 

3 

4# Hermes : Change Data Capture (CDC) tool from any source(s) to any target 

5# Copyright (C) 2023 INSA Strasbourg 

6# 

7# This file is part of Hermes. 

8# 

9# Hermes is free software: you can redistribute it and/or modify 

10# it under the terms of the GNU General Public License as published by 

11# the Free Software Foundation, either version 3 of the License, or 

12# (at your option) any later version. 

13# 

14# Hermes is distributed in the hope that it will be useful, 

15# but WITHOUT ANY WARRANTY; without even the implied warranty of 

16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

17# GNU General Public License for more details. 

18# 

19# You should have received a copy of the GNU General Public License 

20# along with Hermes. If not, see <https://www.gnu.org/licenses/>. 

21 

22 

23from typing import Any, Type 

24 

25from Cryptodome.PublicKey import RSA 

26from Cryptodome.Cipher import PKCS1_OAEP 

27from Cryptodome.Cipher.PKCS1_OAEP import PKCS1OAEP_Cipher 

28from Cryptodome.Hash import ( 

29 SHA224, 

30 SHA256, 

31 SHA384, 

32 SHA512, 

33 SHA3_224, 

34 SHA3_256, 

35 SHA3_384, 

36 SHA3_512, 

37) 

38from jinja2 import Undefined 

39 

40import base64 

41 

42from lib.plugins import AbstractAttributePlugin 

43 

44HERMES_PLUGIN_CLASSNAME: str | None = "Attribute_Crypto_RSA_OAEP_Plugin" 

45"""The plugin class name defined in this module file""" 

46 

47 

48class Attribute_Crypto_RSA_OAEP_Plugin(AbstractAttributePlugin): 

49 """Plugin to encrypt/decrypt strings with asymmetric RSA keys, using PKCS#1 

50 OAEP,an asymmetric cipher based on RSA and the OAEP padding""" 

51 

52 __hashclasses: dict[str, Type[Any]] = { 

53 "SHA224": SHA224, 

54 "SHA256": SHA256, 

55 "SHA384": SHA384, 

56 "SHA512": SHA512, 

57 "SHA3_224": SHA3_224, 

58 "SHA3_256": SHA3_256, 

59 "SHA3_384": SHA3_384, 

60 "SHA3_512": SHA3_512, 

61 } 

62 

63 def __init__(self, settings: dict[str, any]) -> None: 

64 """Instantiate new plugin and store a copy of its settings dict in 

65 self._settings""" 

66 super().__init__(settings) 

67 

68 self.__keys: dict[str, tuple[str, PKCS1OAEP_Cipher]] = {} 

69 for key, keysettings in settings["keys"].items(): 

70 rsa = RSA.importKey(keysettings["rsa_key"]) 

71 operation = "decrypt" if rsa.has_private() else "encrypt" 

72 self.__keys[key] = ( 

73 operation, 

74 PKCS1_OAEP.new( 

75 rsa, 

76 hashAlgo=self.__hashclasses[keysettings["hash"]], 

77 ), 

78 ) 

79 

80 def filter(self, value: bytes | str | None | Undefined, keyname: str) -> str: 

81 """Call the plugin with specified value and keyname, and returns the result. 

82 The plugin will determine if it's an encryption or a decryption operation upon 

83 the key type: decryption for private keys, and encryption for public keys. 

84 

85 Encryption: value is either a byte-array or a string, and result is a base64 

86 encoded byte-array. 

87 

88 Decryption: value is either a byte-array or a base64 encoded byte-array, and 

89 result is a string. 

90 """ 

91 if keyname not in self.__keys: 

92 raise IndexError( 

93 f"Specified {keyname=} doesn't exist in" 

94 " hermes.plugins.attributes.RSA_OAEP_crypto.settings in config file" 

95 ) 

96 

97 if value is None: 

98 return Undefined(hint="No value specified") 

99 

100 if isinstance(value, Undefined): 

101 return value 

102 

103 if type(value) not in (bytes, str): 

104 raise TypeError( 

105 f"Invalid value type {type(value)=}. Valid types are (bytes, str)" 

106 ) 

107 

108 operation, cipherinst = self.__keys[keyname] 

109 

110 if operation == "decrypt": 

111 return self.decrypt(value, cipherinst) 

112 else: 

113 return self.encrypt(value, cipherinst) 

114 

115 def encrypt(self, value: bytes | str, cipherinst: PKCS1OAEP_Cipher) -> str: 

116 """Encrypt the specified value, and returns the resulting byte-array 

117 encoded in a base64 string""" 

118 if type(value) is bytes: 

119 # Raw binary encoded byte array 

120 value_bytes = value 

121 else: 

122 # UTF8 string 

123 value_bytes = value.encode("utf8") 

124 

125 return base64.b64encode(cipherinst.encrypt(value_bytes)).decode("ascii") 

126 

127 def decrypt(self, value: bytes | str, cipherinst: PKCS1OAEP_Cipher) -> str: 

128 """Decrypt the specified value, and returns the result as a string""" 

129 if type(value) is bytes: 

130 # Raw binary encoded byte array 

131 value_bytes = value 

132 else: 

133 # Base64 encoded byte array 

134 value_bytes = base64.b64decode(value.encode("ascii")) 

135 

136 return cipherinst.decrypt(value_bytes).decode("utf8")