# Copied from https://github.com/linuxmuster/linuxmuster-mailcow/blob/4409c726ebbea794f1ea6481684dd3c54339d0e0/src/ldapHelper.py import ldap, logging class LdapHelper: def __init__(self, ldapUri, ldapBindDn, ldapBindPassword, ldapBaseDn): self._uri = ldapUri self._bindDn = ldapBindDn self._bindPassword = ldapBindPassword self._baseDn = ldapBaseDn def __del__(self): self.unbind() def bind(self): try: self._ldapConnection = ldap.initialize(f"{self._uri}") self._ldapConnection.set_option(ldap.OPT_REFERRALS, 0) self._ldapConnection.simple_bind_s(self._bindDn, self._bindPassword) return True except Exception as e: logging.critical("!!! Error binding to ldap! {} !!!".format(e)) return False def unbind(self): if self._ldapConnection != None: self._ldapConnection.unbind_s() self._ldapConnection = None def search(self, filter, attrlist=None): if self._ldapConnection == None: logging.critical("Cannot talk to LDAP") return False, None try: rawResults = self._ldapConnection.search_s( self._baseDn, ldap.SCOPE_SUBTREE, filter, attrlist ) except Exception as e: logging.critical("Error executing LDAP search!") print(e) return False, None try: processedResults = [] if len(rawResults) <= 0 or rawResults[0][0] == None: return False, None for dn, rawResult in rawResults: if not dn: continue processedResult = {} for attribute, rawValue in rawResult.items(): try: if len(rawValue) == 1: processedResult[attribute] = str(rawValue[0].decode()) elif len(rawValue) > 0: processedResult[attribute] = [] for rawItem in rawValue: processedResult[attribute].append(str(rawItem.decode())) except UnicodeDecodeError: continue processedResults.append(processedResult) return True, processedResults except Exception as e: print(e) return False, None