Coverage for plugins/datasources/ldap/ldap.py: 26%
102 statements
« prev ^ index » next coverage.py v7.10.1, created at 2025-07-28 07:25 +0000
« prev ^ index » next coverage.py v7.10.1, created at 2025-07-28 07:25 +0000
1#!/usr/bin/env python3
2# -*- coding: utf-8 -*-
4# Hermes : Change Data Capture (CDC) tool from any source(s) to any target
5# Copyright (C) 2023, 2024 INSA Strasbourg
6#
7# This file is part of Hermes.
8#
9# Hermes is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# Hermes is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with Hermes. If not, see <https://www.gnu.org/licenses/>.
23from typing import Any, TYPE_CHECKING
25if TYPE_CHECKING: # pragma: no cover
26 # Only for type hints, won't import at runtime
27 from ldap.ldapobject import LDAPObject
29from lib.plugins import AbstractDataSourcePlugin
30import ldap
31import ldap.modlist
32from datetime import datetime
34HERMES_PLUGIN_CLASSNAME: str | None = "DatasourceLdap"
35"""The plugin class name defined in this module file"""
38class DatasourceLdap(AbstractDataSourcePlugin):
39 """Remote Data Source for LDAP server"""
41 def __init__(self, settings: dict[str, Any]):
42 """Instantiate new plugin and store a copy of its settings dict in
43 self._settings"""
44 super().__init__(settings)
45 self._ldap: LDAPObject | None = None
47 # Not well documented in python-ldap,
48 # Found this in doc/drafts/draft-ietf-ldapext-ldap-c-api-xx.txt
49 # in openldap-master:
50 # LDAP_OPT_RESTART (0x09)
51 # Type for invalue parameter: void * (LDAP_OPT_ON or LDAP_OPT_OFF)
52 #
53 # Type for outvalue parameter: int *
54 #
55 # Description:
56 # Determines whether LDAP I/O operations are automatically res-
57 # tarted if they abort prematurely. It MAY be set to one of the
58 # constants LDAP_OPT_ON or LDAP_OPT_OFF; any non-NULL pointer
59 # value passed to ldap_set_option() enables this option. When
60 # reading the current setting using ldap_get_option(), a zero
61 # value means OFF and any non-zero value means ON. This option
62 # is useful if an LDAP I/O operation can be interrupted prema-
63 # turely, for example by a timer going off, or other interrupt.
64 # By default, this option is OFF.
65 #
66 # Will avoid that an handled SIGINT raises an exception
67 ldap.set_option(ldap.OPT_RESTART, ldap.OPT_ON)
69 if settings["ssl"]:
70 if "cafile" in settings["ssl"]:
71 ldap.set_option(ldap.OPT_X_TLS_CACERTFILE, settings["ssl"]["cafile"])
72 if "certfile" in settings["ssl"]:
73 ldap.set_option(ldap.OPT_X_TLS_CERTFILE, settings["ssl"]["certfile"])
74 if "keyfile" in settings["ssl"]:
75 ldap.set_option(ldap.OPT_X_TLS_KEYFILE, settings["ssl"]["keyfile"])
77 ldap.set_option(ldap.OPT_X_TLS_NEWCTX, 0)
79 def open(self):
80 """Establish connection with LDAP server"""
81 self._ldap = ldap.initialize(self._settings["uri"], bytes_mode=False)
82 self._ldap.simple_bind_s(
83 who=self._settings["binddn"],
84 cred=self._settings["bindpassword"],
85 )
87 def close(self):
88 """Close connection with LDAP server"""
89 self._ldap.unbind_s()
90 self._ldap = None
92 def fetch(
93 self,
94 query: str | None,
95 vars: dict[str, Any],
96 ) -> list[dict[str, Any]]:
97 """Fetch data from datasource with specified query and optional queryvars.
98 Returns a list of dict containing each entry fetched, with REMOTE_ATTRIBUTES
99 as keys, and corresponding fetched values as values"""
100 scopes = {
101 "base": ldap.SCOPE_BASE,
102 "one": ldap.SCOPE_ONELEVEL,
103 "onelevel": ldap.SCOPE_ONELEVEL,
104 "sub": ldap.SCOPE_SUBTREE,
105 "subtree": ldap.SCOPE_SUBTREE,
106 "DEFAULT": ldap.SCOPE_SUBTREE,
107 }
108 fetcheddata = []
110 results = self._ldap.search_s(
111 base=vars.get("base", self._settings["basedn"]),
112 scope=scopes.get(vars.get("scope", "DEFAULT"), "DEFAULT"),
113 filterstr=vars.get("filter", "(objectClass=*)"),
114 attrlist=vars.get("attrlist", None),
115 )
117 for dn, entry in results:
118 flatentry = dict()
119 for k in vars.get("attrlist", entry.keys()):
120 v = entry.get(k, [])
121 if self._settings["always_return_values_in_list"] or len(v) > 1:
122 flatentry[k] = self._convert_from_ldap(v)
123 elif len(v) == 1:
124 flatentry[k] = self._convert_from_ldap(v[0])
125 else:
126 flatentry[k] = None
127 fetcheddata.append(flatentry)
128 return fetcheddata
130 def add(self, query: str | None, vars: dict[str, Any]):
131 """Add LDAP entries on datasource with specified vars. Query is ignored
132 Example of vars dict:
133 vars = {
134 "addlist": [
135 {
136 "dn": "uid=whatever,ou=company,dc=example,dc=com", # Mandatory
137 "add": { # Facultative
138 # Create attribute if it doesn't exist,
139 # and add "value" to it
140 "attrnameToAdd": "value",
141 # Create attribute if it doesn't exist,
142 # and add "value1" and "value2" to it
143 "attrnameToAddList": ["value1", "value2"],
144 },
145 },
146 {
147 "dn": "uid=otherdn,ou=company,dc=example,dc=com",
148 ...
149 },
150 ...
151 ]
152 }
153 """
154 for item in vars.get("addlist", []):
155 addlist = []
157 dn = item.get("dn")
158 if not dn:
159 continue
161 for attrname, attrvalue in item.get("add", {}).items():
162 addlist.append(
163 (attrname, self.convert_to_ldap(attrvalue)),
164 )
166 if addlist:
167 self._ldap.add_s(dn, addlist)
169 def delete(self, query: str | None, vars: dict[str, Any]):
170 """Delete LDAP entries on datasource with specified vars. Query is ignored
171 Example of vars dict:
172 vars = {
173 "dellist": [
174 {
175 "dn": "uid=whatever,ou=company,dc=example,dc=com", # Mandatory
176 },
177 {
178 "dn": "uid=otherdn,ou=company,dc=example,dc=com",
179 ...
180 },
181 ...
182 ]
183 }
184 """
185 for item in vars.get("dellist", []):
186 dn = item.get("dn")
187 if dn:
188 self._ldap.delete_s(dn)
190 def modify(self, query: str | None, vars: dict[str, Any]):
191 """Modify LDAP entries on datasource with specified vars. Query is ignored
192 Example of vars dict:
193 vars = {
194 "modlist": [
195 {
196 "dn": "uid=whatever,ou=company,dc=example,dc=com", # Mandatory
197 "add": { # Facultative
198 # Create attribute if it doesn't exist,
199 # and add "value" to it
200 "attrnameToAdd": "value",
201 # Create attribute if it doesn't exist,
202 # and add "value1" and "value2" to it
203 "attrnameToAddList": ["value1", "value2"],
204 },
205 "modify": { # Facultative
206 # Create attribute if it doesn't exist,
207 # and replace all its value by "value"
208 "attrnameToModify": "newvalue",
209 # Create attribute if it doesn't exist,
210 # and replace all its value by "newvalue1" and "newvalue2"
211 "attrnameToModifyList": ["newvalue1", "newvalue2"],
212 },
213 "delete": { # Facultative
214 # Delete specified attribute and all of its values
215 "attrnameToDelete": None,
216 # Delete "value" from specified attribute.
217 # Raise an error if value is missing
218 "attrnameToDeleteValue": "value",
219 # Delete "value1" and "value2" from specified attribute.
220 # Raise an error if a value is missing
221 "attrnameToDeleteValueList": ["value1", "value2"],
222 },
223 },
224 {
225 "dn": "uid=otherdn,ou=company,dc=example,dc=com",
226 ...
227 },
228 ...
229 ]
230 }
231 """
232 for item in vars.get("modlist", []):
233 modlist = []
235 dn = item.get("dn")
236 if not dn:
237 continue
239 for action, ldpaaction in (
240 ("delete", ldap.MOD_DELETE),
241 ("add", ldap.MOD_ADD),
242 ("modify", ldap.MOD_REPLACE),
243 ):
244 for attrname, attrvalue in item.get(action, {}).items():
245 modlist.append(
246 (ldpaaction, attrname, self.convert_to_ldap(attrvalue)),
247 )
249 if modlist:
250 self._ldap.modify_s(dn, modlist)
252 @classmethod
253 def _convert_from_ldap(
254 cls, data: bytearray | list[bytearray]
255 ) -> int | float | datetime | str | list[int | float | datetime | str]:
256 """Convert specified bytearray data to native type and returns it"""
257 if type(data) is list:
258 return [cls._convert_from_ldap(i) for i in data]
260 val = data.decode()
261 tests = [
262 int,
263 float,
264 cls._convertdatetime_from_ldap,
265 ]
267 for func in tests:
268 try:
269 res = func(val)
270 except ValueError:
271 pass
272 else:
273 return res # Conversion succeed
275 return val # str
277 @classmethod
278 def _convertdatetime_from_ldap(cls, val: str) -> datetime:
279 """Try to convert specified val to datetime returns it.
280 Raise ValueError if conversion failed"""
281 res = datetime.strptime(val, "%Y%m%d%H%M%S%z").replace(tzinfo=None)
282 return res
284 @staticmethod
285 def convert_to_ldap(value: Any) -> list[bytearray]:
286 """Convert specified value to ldap format"""
288 # Particular case for delete operation, must not be encased in list
289 if value is None:
290 return None
292 # Convert dattime instances to ldap datetime
293 if isinstance(value, datetime):
294 res = DatasourceLdap._convertdatetime_to_ldap(value)
295 elif type(value) is list:
296 res = []
297 for listval in value:
298 if isinstance(listval, datetime):
299 res.append(DatasourceLdap._convertdatetime_to_ldap(listval))
300 else:
301 res.append(listval)
302 else:
303 # Convert values to list
304 res = [value]
306 # Convert values to str if necessary, then to bytes
307 res = [str(v).encode("utf-8") for v in res]
309 return res
311 @classmethod
312 def _convertdatetime_to_ldap(cls, dt: datetime) -> str:
313 """Try to convert specified datetime dt to ldap str and returns it.
314 Raise ValueError if conversion failed"""
315 return dt.strftime("%Y%m%d%H%M%SZ")