#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""LDAP methods module."""
from hvac import exceptions, utils
from hvac.api.vault_api_base import VaultApiBase
DEFAULT_MOUNT_POINT = 'ldap'
[docs]class Ldap(VaultApiBase):
"""LDAP Auth Method (API).
Reference: https://www.vaultproject.io/api/auth/ldap/index.html
"""
[docs] def read_configuration(self, mount_point=DEFAULT_MOUNT_POINT):
"""
Retrieve the LDAP configuration for the auth method.
Supported methods:
GET: /auth/{mount_point}/config. Produces: 200 application/json
:param mount_point: The "path" the method/backend was mounted on.
:type mount_point: str | unicode
:return: The JSON response of the read_configuration request.
:rtype: dict
"""
api_path = utils.format_url('/v1/auth/{mount_point}/config', mount_point=mount_point)
response = self._adapter.get(
url=api_path,
)
return response.json()
[docs] def create_or_update_group(self, name, policies=None, mount_point=DEFAULT_MOUNT_POINT):
"""
Create or update LDAP group policies.
Supported methods:
POST: /auth/{mount_point}/groups/{name}. Produces: 204 (empty body)
:param name: The name of the LDAP group
:type name: str | unicode
:param policies: List of policies associated with the group. This parameter is transformed to a comma-delimited
string before being passed to Vault.
:type policies: list
:param mount_point: The "path" the method/backend was mounted on.
:type mount_point: str | unicode
:return: The response of the create_or_update_group request.
:rtype: requests.Response
"""
if policies is not None and not isinstance(policies, list):
error_msg = '"policies" argument must be an instance of list or None, "{policies_type}" provided.'.format(
policies_type=type(policies),
)
raise exceptions.ParamValidationError(error_msg)
params = {}
if policies is not None:
params['policies'] = ','.join(policies)
api_path = utils.format_url(
'/v1/auth/{mount_point}/groups/{name}',
mount_point=mount_point,
name=name,
)
return self._adapter.post(
url=api_path,
json=params,
)
[docs] def list_groups(self, mount_point=DEFAULT_MOUNT_POINT):
"""
List existing LDAP existing groups that have been created in this auth method.
Supported methods:
LIST: /auth/{mount_point}/groups. Produces: 200 application/json
:param mount_point: The "path" the method/backend was mounted on.
:type mount_point: str | unicode
:return: The JSON response of the list_groups request.
:rtype: dict
"""
api_path = utils.format_url('/v1/auth/{mount_point}/groups', mount_point=mount_point)
response = self._adapter.list(
url=api_path,
)
return response.json()
[docs] def read_group(self, name, mount_point=DEFAULT_MOUNT_POINT):
"""
Read policies associated with a LDAP group.
Supported methods:
GET: /auth/{mount_point}/groups/{name}. Produces: 200 application/json
:param name: The name of the LDAP group
:type name: str | unicode
:param mount_point: The "path" the method/backend was mounted on.
:type mount_point: str | unicode
:return: The JSON response of the read_group request.
:rtype: dict
"""
params = {
'name': name,
}
api_path = utils.format_url(
'/v1/auth/{mount_point}/groups/{name}',
mount_point=mount_point,
name=name,
)
response = self._adapter.get(
url=api_path,
json=params,
)
return response.json()
[docs] def delete_group(self, name, mount_point=DEFAULT_MOUNT_POINT):
"""
Delete a LDAP group and policy association.
Supported methods:
DELETE: /auth/{mount_point}/groups/{name}. Produces: 204 (empty body)
:param name: The name of the LDAP group
:type name: str | unicode
:param mount_point: The "path" the method/backend was mounted on.
:type mount_point: str | unicode
:return: The response of the delete_group request.
:rtype: requests.Response
"""
api_path = utils.format_url(
'/v1/auth/{mount_point}/groups/{name}',
mount_point=mount_point,
name=name,
)
return self._adapter.delete(
url=api_path,
)
[docs] def create_or_update_user(self, username, policies=None, groups=None, mount_point=DEFAULT_MOUNT_POINT):
"""
Create or update LDAP users policies and group associations.
Supported methods:
POST: /auth/{mount_point}/users/{username}. Produces: 204 (empty body)
:param username: The username of the LDAP user
:type username: str | unicode
:param policies: List of policies associated with the user. This parameter is transformed to a comma-delimited
string before being passed to Vault.
:type policies: str | unicode
:param groups: List of groups associated with the user. This parameter is transformed to a comma-delimited
string before being passed to Vault.
:type groups: str | unicode
:param mount_point: The "path" the method/backend was mounted on.
:type mount_point: str | unicode
:return: The response of the create_or_update_user request.
:rtype: requests.Response
"""
list_required_params = {
'policies': policies,
'groups': groups,
}
for param_name, param_arg in list_required_params.items():
if param_arg is not None and not isinstance(param_arg, list):
error_msg = '"{param_name}" argument must be an instance of list or None, "{param_type}" provided.'.format(
param_name=param_name,
param_type=type(param_arg),
)
raise exceptions.ParamValidationError(error_msg)
params = {}
if policies is not None:
params['policies'] = ','.join(policies)
if groups is not None:
params['groups'] = ','.join(groups)
api_path = utils.format_url(
'/v1/auth/{mount_point}/users/{username}',
mount_point=mount_point,
username=username,
)
return self._adapter.post(
url=api_path,
json=params,
)
[docs] def list_users(self, mount_point=DEFAULT_MOUNT_POINT):
"""
List existing users in the method.
Supported methods:
LIST: /auth/{mount_point}/users. Produces: 200 application/json
:param mount_point: The "path" the method/backend was mounted on.
:type mount_point: str | unicode
:return: The JSON response of the list_users request.
:rtype: dict
"""
api_path = utils.format_url('/v1/auth/{mount_point}/users', mount_point=mount_point)
response = self._adapter.list(
url=api_path,
)
return response.json()
[docs] def read_user(self, username, mount_point=DEFAULT_MOUNT_POINT):
"""
Read policies associated with a LDAP user.
Supported methods:
GET: /auth/{mount_point}/users/{username}. Produces: 200 application/json
:param username: The username of the LDAP user
:type username: str | unicode
:param mount_point: The "path" the method/backend was mounted on.
:type mount_point: str | unicode
:return: The JSON response of the read_user request.
:rtype: dict
"""
api_path = utils.format_url(
'/v1/auth/{mount_point}/users/{username}',
mount_point=mount_point,
username=username,
)
response = self._adapter.get(
url=api_path,
)
return response.json()
[docs] def delete_user(self, username, mount_point=DEFAULT_MOUNT_POINT):
"""
Delete a LDAP user and policy association.
Supported methods:
DELETE: /auth/{mount_point}/users/{username}. Produces: 204 (empty body)
:param username: The username of the LDAP user
:type username: str | unicode
:param mount_point: The "path" the method/backend was mounted on.
:type mount_point: str | unicode
:return: The response of the delete_user request.
:rtype: requests.Response
"""
api_path = utils.format_url(
'/v1/auth/{mount_point}/users/{username}',
mount_point=mount_point,
username=username,
)
return self._adapter.delete(
url=api_path,
)
[docs] def login(self, username, password, use_token=True, mount_point=DEFAULT_MOUNT_POINT):
"""
Log in with LDAP credentials.
Supported methods:
POST: /auth/{mount_point}/login/{username}. Produces: 200 application/json
:param username: The username of the LDAP user
:type username: str | unicode
:param password: The password for the LDAP user
:type password: str | unicode
:param use_token: if True, uses the token in the response received from the auth request to set the "token"
attribute on the the :py:meth:`hvac.adapters.Adapter` instance under the _adapater Client attribute.
:type use_token: bool
:param mount_point: The "path" the method/backend was mounted on.
:type mount_point: str | unicode
:return: The response of the login_with_user request.
:rtype: requests.Response
"""
params = {
'password': password,
}
api_path = utils.format_url(
'/v1/auth/{mount_point}/login/{username}',
mount_point=mount_point,
username=username,
)
return self._adapter.login(
url=api_path,
use_token=use_token,
json=params,
)