Coverage for plugins/datasources/ldap/ldap.py: 26%

102 statements  

« prev     ^ index     » next       coverage.py v7.10.1, created at 2025-07-28 07:24 +0000

1#!/usr/bin/env python3 

2# -*- coding: utf-8 -*- 

3 

4# Hermes : Change Data Capture (CDC) tool from any source(s) to any target 

5# Copyright (C) 2023, 2024 INSA Strasbourg 

6# 

7# This file is part of Hermes. 

8# 

9# Hermes is free software: you can redistribute it and/or modify 

10# it under the terms of the GNU General Public License as published by 

11# the Free Software Foundation, either version 3 of the License, or 

12# (at your option) any later version. 

13# 

14# Hermes is distributed in the hope that it will be useful, 

15# but WITHOUT ANY WARRANTY; without even the implied warranty of 

16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

17# GNU General Public License for more details. 

18# 

19# You should have received a copy of the GNU General Public License 

20# along with Hermes. If not, see <https://www.gnu.org/licenses/>. 

21 

22 

23from typing import Any, TYPE_CHECKING 

24 

25if TYPE_CHECKING: # pragma: no cover 

26 # Only for type hints, won't import at runtime 

27 from ldap.ldapobject import LDAPObject 

28 

29from lib.plugins import AbstractDataSourcePlugin 

30import ldap 

31import ldap.modlist 

32from datetime import datetime 

33 

34HERMES_PLUGIN_CLASSNAME: str | None = "DatasourceLdap" 

35"""The plugin class name defined in this module file""" 

36 

37 

38class DatasourceLdap(AbstractDataSourcePlugin): 

39 """Remote Data Source for LDAP server""" 

40 

41 def __init__(self, settings: dict[str, Any]): 

42 """Instantiate new plugin and store a copy of its settings dict in 

43 self._settings""" 

44 super().__init__(settings) 

45 self._ldap: LDAPObject | None = None 

46 

47 # Not well documented in python-ldap, 

48 # Found this in doc/drafts/draft-ietf-ldapext-ldap-c-api-xx.txt 

49 # in openldap-master: 

50 # LDAP_OPT_RESTART (0x09) 

51 # Type for invalue parameter: void * (LDAP_OPT_ON or LDAP_OPT_OFF) 

52 # 

53 # Type for outvalue parameter: int * 

54 # 

55 # Description: 

56 # Determines whether LDAP I/O operations are automatically res- 

57 # tarted if they abort prematurely. It MAY be set to one of the 

58 # constants LDAP_OPT_ON or LDAP_OPT_OFF; any non-NULL pointer 

59 # value passed to ldap_set_option() enables this option. When 

60 # reading the current setting using ldap_get_option(), a zero 

61 # value means OFF and any non-zero value means ON. This option 

62 # is useful if an LDAP I/O operation can be interrupted prema- 

63 # turely, for example by a timer going off, or other interrupt. 

64 # By default, this option is OFF. 

65 # 

66 # Will avoid that an handled SIGINT raises an exception 

67 ldap.set_option(ldap.OPT_RESTART, ldap.OPT_ON) 

68 

69 if settings["ssl"]: 

70 if "cafile" in settings["ssl"]: 

71 ldap.set_option(ldap.OPT_X_TLS_CACERTFILE, settings["ssl"]["cafile"]) 

72 if "certfile" in settings["ssl"]: 

73 ldap.set_option(ldap.OPT_X_TLS_CERTFILE, settings["ssl"]["certfile"]) 

74 if "keyfile" in settings["ssl"]: 

75 ldap.set_option(ldap.OPT_X_TLS_KEYFILE, settings["ssl"]["keyfile"]) 

76 

77 ldap.set_option(ldap.OPT_X_TLS_NEWCTX, 0) 

78 

79 def open(self): 

80 """Establish connection with LDAP server""" 

81 self._ldap = ldap.initialize(self._settings["uri"], bytes_mode=False) 

82 self._ldap.simple_bind_s( 

83 who=self._settings["binddn"], 

84 cred=self._settings["bindpassword"], 

85 ) 

86 

87 def close(self): 

88 """Close connection with LDAP server""" 

89 self._ldap.unbind_s() 

90 self._ldap = None 

91 

92 def fetch( 

93 self, 

94 query: str | None, 

95 vars: dict[str, Any], 

96 ) -> list[dict[str, Any]]: 

97 """Fetch data from datasource with specified query and optional queryvars. 

98 Returns a list of dict containing each entry fetched, with REMOTE_ATTRIBUTES 

99 as keys, and corresponding fetched values as values""" 

100 scopes = { 

101 "base": ldap.SCOPE_BASE, 

102 "one": ldap.SCOPE_ONELEVEL, 

103 "onelevel": ldap.SCOPE_ONELEVEL, 

104 "sub": ldap.SCOPE_SUBTREE, 

105 "subtree": ldap.SCOPE_SUBTREE, 

106 "DEFAULT": ldap.SCOPE_SUBTREE, 

107 } 

108 fetcheddata = [] 

109 

110 results = self._ldap.search_s( 

111 base=vars.get("base", self._settings["basedn"]), 

112 scope=scopes.get(vars.get("scope", "DEFAULT"), "DEFAULT"), 

113 filterstr=vars.get("filter", "(objectClass=*)"), 

114 attrlist=vars.get("attrlist", None), 

115 ) 

116 

117 for dn, entry in results: 

118 flatentry = dict() 

119 for k in vars.get("attrlist", entry.keys()): 

120 v = entry.get(k, []) 

121 if self._settings["always_return_values_in_list"] or len(v) > 1: 

122 flatentry[k] = self._convert_from_ldap(v) 

123 elif len(v) == 1: 

124 flatentry[k] = self._convert_from_ldap(v[0]) 

125 else: 

126 flatentry[k] = None 

127 fetcheddata.append(flatentry) 

128 return fetcheddata 

129 

130 def add(self, query: str | None, vars: dict[str, Any]): 

131 """Add LDAP entries on datasource with specified vars. Query is ignored 

132 Example of vars dict: 

133 vars = { 

134 "addlist": [ 

135 { 

136 "dn": "uid=whatever,ou=company,dc=example,dc=com", # Mandatory 

137 "add": { # Facultative 

138 # Create attribute if it doesn't exist, 

139 # and add "value" to it 

140 "attrnameToAdd": "value", 

141 # Create attribute if it doesn't exist, 

142 # and add "value1" and "value2" to it 

143 "attrnameToAddList": ["value1", "value2"], 

144 }, 

145 }, 

146 { 

147 "dn": "uid=otherdn,ou=company,dc=example,dc=com", 

148 ... 

149 }, 

150 ... 

151 ] 

152 } 

153 """ 

154 for item in vars.get("addlist", []): 

155 addlist = [] 

156 

157 dn = item.get("dn") 

158 if not dn: 

159 continue 

160 

161 for attrname, attrvalue in item.get("add", {}).items(): 

162 addlist.append( 

163 (attrname, self.convert_to_ldap(attrvalue)), 

164 ) 

165 

166 if addlist: 

167 self._ldap.add_s(dn, addlist) 

168 

169 def delete(self, query: str | None, vars: dict[str, Any]): 

170 """Delete LDAP entries on datasource with specified vars. Query is ignored 

171 Example of vars dict: 

172 vars = { 

173 "dellist": [ 

174 { 

175 "dn": "uid=whatever,ou=company,dc=example,dc=com", # Mandatory 

176 }, 

177 { 

178 "dn": "uid=otherdn,ou=company,dc=example,dc=com", 

179 ... 

180 }, 

181 ... 

182 ] 

183 } 

184 """ 

185 for item in vars.get("dellist", []): 

186 dn = item.get("dn") 

187 if dn: 

188 self._ldap.delete_s(dn) 

189 

190 def modify(self, query: str | None, vars: dict[str, Any]): 

191 """Modify LDAP entries on datasource with specified vars. Query is ignored 

192 Example of vars dict: 

193 vars = { 

194 "modlist": [ 

195 { 

196 "dn": "uid=whatever,ou=company,dc=example,dc=com", # Mandatory 

197 "add": { # Facultative 

198 # Create attribute if it doesn't exist, 

199 # and add "value" to it 

200 "attrnameToAdd": "value", 

201 # Create attribute if it doesn't exist, 

202 # and add "value1" and "value2" to it 

203 "attrnameToAddList": ["value1", "value2"], 

204 }, 

205 "modify": { # Facultative 

206 # Create attribute if it doesn't exist, 

207 # and replace all its value by "value" 

208 "attrnameToModify": "newvalue", 

209 # Create attribute if it doesn't exist, 

210 # and replace all its value by "newvalue1" and "newvalue2" 

211 "attrnameToModifyList": ["newvalue1", "newvalue2"], 

212 }, 

213 "delete": { # Facultative 

214 # Delete specified attribute and all of its values 

215 "attrnameToDelete": None, 

216 # Delete "value" from specified attribute. 

217 # Raise an error if value is missing 

218 "attrnameToDeleteValue": "value", 

219 # Delete "value1" and "value2" from specified attribute. 

220 # Raise an error if a value is missing 

221 "attrnameToDeleteValueList": ["value1", "value2"], 

222 }, 

223 }, 

224 { 

225 "dn": "uid=otherdn,ou=company,dc=example,dc=com", 

226 ... 

227 }, 

228 ... 

229 ] 

230 } 

231 """ 

232 for item in vars.get("modlist", []): 

233 modlist = [] 

234 

235 dn = item.get("dn") 

236 if not dn: 

237 continue 

238 

239 for action, ldpaaction in ( 

240 ("delete", ldap.MOD_DELETE), 

241 ("add", ldap.MOD_ADD), 

242 ("modify", ldap.MOD_REPLACE), 

243 ): 

244 for attrname, attrvalue in item.get(action, {}).items(): 

245 modlist.append( 

246 (ldpaaction, attrname, self.convert_to_ldap(attrvalue)), 

247 ) 

248 

249 if modlist: 

250 self._ldap.modify_s(dn, modlist) 

251 

252 @classmethod 

253 def _convert_from_ldap( 

254 cls, data: bytearray | list[bytearray] 

255 ) -> int | float | datetime | str | list[int | float | datetime | str]: 

256 """Convert specified bytearray data to native type and returns it""" 

257 if type(data) is list: 

258 return [cls._convert_from_ldap(i) for i in data] 

259 

260 val = data.decode() 

261 tests = [ 

262 int, 

263 float, 

264 cls._convertdatetime_from_ldap, 

265 ] 

266 

267 for func in tests: 

268 try: 

269 res = func(val) 

270 except ValueError: 

271 pass 

272 else: 

273 return res # Conversion succeed 

274 

275 return val # str 

276 

277 @classmethod 

278 def _convertdatetime_from_ldap(cls, val: str) -> datetime: 

279 """Try to convert specified val to datetime returns it. 

280 Raise ValueError if conversion failed""" 

281 res = datetime.strptime(val, "%Y%m%d%H%M%S%z").replace(tzinfo=None) 

282 return res 

283 

284 @staticmethod 

285 def convert_to_ldap(value: Any) -> list[bytearray]: 

286 """Convert specified value to ldap format""" 

287 

288 # Particular case for delete operation, must not be encased in list 

289 if value is None: 

290 return None 

291 

292 # Convert dattime instances to ldap datetime 

293 if isinstance(value, datetime): 

294 res = DatasourceLdap._convertdatetime_to_ldap(value) 

295 elif type(value) is list: 

296 res = [] 

297 for listval in value: 

298 if isinstance(listval, datetime): 

299 res.append(DatasourceLdap._convertdatetime_to_ldap(listval)) 

300 else: 

301 res.append(listval) 

302 else: 

303 # Convert values to list 

304 res = [value] 

305 

306 # Convert values to str if necessary, then to bytes 

307 res = [str(v).encode("utf-8") for v in res] 

308 

309 return res 

310 

311 @classmethod 

312 def _convertdatetime_to_ldap(cls, dt: datetime) -> str: 

313 """Try to convert specified datetime dt to ldap str and returns it. 

314 Raise ValueError if conversion failed""" 

315 return dt.strftime("%Y%m%d%H%M%SZ")