Source code for hvac.v1

from __future__ import unicode_literals

import json
import os
from base64 import b64encode

from hvac import aws_utils, exceptions, adapters, utils, api
from hvac.constants.client import DEPRECATED_PROPERTIES, DEFAULT_URL

try:
    import hcl
    has_hcl_parser = True
except ImportError:
    has_hcl_parser = False


[docs]class Client(object): """The hvac Client class for HashiCorp's Vault."""
[docs] def __init__(self, url=None, token=None, cert=None, verify=True, timeout=30, proxies=None, allow_redirects=True, session=None, adapter=adapters.Request, namespace=None): """Creates a new hvac client instance. :param url: Base URL for the Vault instance being addressed. :type url: str :param token: Authentication token to include in requests sent to Vault. :type token: str :param cert: Certificates for use in requests sent to the Vault instance. This should be a tuple with the certificate and then key. :type cert: tuple :param verify: Either a boolean to indicate whether TLS verification should be performed when sending requests to Vault, or a string pointing at the CA bundle to use for verification. See http://docs.python-requests.org/en/master/user/advanced/#ssl-cert-verification. :type verify: Union[bool,str] :param timeout: The timeout value for requests sent to Vault. :type timeout: int :param proxies: Proxies to use when performing requests. See: http://docs.python-requests.org/en/master/user/advanced/#proxies :type proxies: dict :param allow_redirects: Whether to follow redirects when sending requests to Vault. :type allow_redirects: bool :param session: Optional session object to use when performing request. :type session: request.Session :param adapter: Optional class to be used for performing requests. If none is provided, defaults to hvac.adapters.Request :type adapter: hvac.adapters.Adapter :param namespace: Optional Vault Namespace. :type namespace: str """ token = token if token is not None else utils.get_token_from_env() url = url if url else os.getenv('VAULT_ADDR', DEFAULT_URL) self._adapter = adapter( base_uri=url, token=token, cert=cert, verify=verify, timeout=timeout, proxies=proxies, allow_redirects=allow_redirects, session=session, namespace=namespace ) # Instantiate API classes to be exposed as properties on this class starting with auth method classes. self._auth = api.AuthMethods(adapter=self._adapter) self._secrets = api.SecretsEngines(adapter=self._adapter) self._sys = api.SystemBackend(adapter=self._adapter)
def __getattr__(self, name): return utils.getattr_with_deprecated_properties( obj=self, item=name, deprecated_properties=DEPRECATED_PROPERTIES ) @property def adapter(self): return self._adapter @adapter.setter def adapter(self, adapter): self._adapter = adapter @property def url(self): return self._adapter.base_uri @url.setter def url(self, url): self._adapter.base_uri = url @property def token(self): return self._adapter.token @token.setter def token(self, token): self._adapter.token = token @property def session(self): return self._adapter.session @session.setter def session(self, session): self._adapter.session = session @property def allow_redirects(self): return self._adapter.allow_redirects @allow_redirects.setter def allow_redirects(self, allow_redirects): self._adapter.allow_redirects = allow_redirects @property def auth(self): """Accessor for the Client instance's auth methods. Provided via the :py:class:`hvac.api.AuthMethods` class. :return: This Client instance's associated Auth instance. :rtype: hvac.api.AuthMethods """ return self._auth @property def secrets(self): """Accessor for the Client instance's secrets engines. Provided via the :py:class:`hvac.api.SecretsEngines` class. :return: This Client instance's associated SecretsEngines instance. :rtype: hvac.api.SecretsEngines """ return self._secrets @property def sys(self): """Accessor for the Client instance's system backend methods. Provided via the :py:class:`hvac.api.SystemBackend` class. :return: This Client instance's associated SystemBackend instance. :rtype: hvac.api.SystemBackend """ return self._sys @property def generate_root_status(self): return self.sys.read_root_generation_progress() @property def key_status(self): """GET /sys/key-status :return: Information about the current encryption key used by Vault. :rtype: dict """ return self.sys.get_encryption_key_status()['data'] @property def rekey_status(self): return self.sys.read_rekey_progress() @property def ha_status(self): """Read the high availability status and current leader instance of Vault. :return: The JSON response returned by read_leader_status() :rtype: dict """ return self.sys.read_leader_status() @property def seal_status(self): """Read the seal status of the Vault. This is an unauthenticated endpoint. Supported methods: GET: /sys/seal-status. Produces: 200 application/json :return: The JSON response of the request. :rtype: dict """ return self.sys.read_seal_status()
[docs] def read(self, path, wrap_ttl=None): """GET /<path> :param path: :type path: :param wrap_ttl: :type wrap_ttl: :return: :rtype: """ try: return self._adapter.get('/v1/{0}'.format(path), wrap_ttl=wrap_ttl).json() except exceptions.InvalidPath: return None
[docs] def list(self, path): """GET /<path>?list=true :param path: :type path: :return: :rtype: """ try: payload = { 'list': True } return self._adapter.get('/v1/{0}'.format(path), params=payload).json() except exceptions.InvalidPath: return None
[docs] def write(self, path, wrap_ttl=None, **kwargs): """POST /<path> :param path: :type path: :param wrap_ttl: :type wrap_ttl: :param kwargs: :type kwargs: :return: :rtype: """ response = self._adapter.post('/v1/{0}'.format(path), json=kwargs, wrap_ttl=wrap_ttl) if response.status_code == 200: return response.json()
[docs] def delete(self, path): """DELETE /<path> :param path: :type path: :return: :rtype: """ self._adapter.delete('/v1/{0}'.format(path))
[docs] def get_policy(self, name, parse=False): """Retrieve the policy body for the named policy. :param name: The name of the policy to retrieve. :type name: str | unicode :param parse: Specifies whether to parse the policy body using pyhcl or not. :type parse: bool :return: The (optionally parsed) policy body for the specified policy. :rtype: str | dict """ try: policy = self.sys.read_policy(name=name)['data']['rules'] except exceptions.InvalidPath: return None if parse: if not has_hcl_parser: raise ImportError('pyhcl is required for policy parsing') policy = hcl.loads(policy) return policy
[docs] def revoke_self_token(self): """PUT /auth/token/revoke-self :return: :rtype: """ self._adapter.put('/v1/auth/token/revoke-self')
[docs] def create_token(self, role=None, token_id=None, policies=None, meta=None, no_parent=False, lease=None, display_name=None, num_uses=None, no_default_policy=False, ttl=None, orphan=False, wrap_ttl=None, renewable=None, explicit_max_ttl=None, period=None, token_type=None): """POST /auth/token/create POST /auth/token/create/<role> POST /auth/token/create-orphan :param role: :type role: :param token_id: :type token_id: :param policies: :type policies: :param meta: :type meta: :param no_parent: :type no_parent: :param lease: :type lease: :param display_name: :type display_name: :param num_uses: :type num_uses: :param no_default_policy: :type no_default_policy: :param ttl: :type ttl: :param orphan: :type orphan: :param wrap_ttl: :type wrap_ttl: :param renewable: :type renewable: :param explicit_max_ttl: :type explicit_max_ttl: :param period: :type period: :param token_type: :type token_type: :return: :rtype: """ params = { 'id': token_id, 'policies': policies, 'meta': meta, 'no_parent': no_parent, 'display_name': display_name, 'num_uses': num_uses, 'no_default_policy': no_default_policy, 'renewable': renewable } if lease: params['lease'] = lease else: params['ttl'] = ttl params['explicit_max_ttl'] = explicit_max_ttl if explicit_max_ttl: params['explicit_max_ttl'] = explicit_max_ttl if period: params['period'] = period if token_type: params['type'] = token_type if orphan: return self._adapter.post('/v1/auth/token/create-orphan', json=params, wrap_ttl=wrap_ttl).json() elif role: return self._adapter.post('/v1/auth/token/create/{0}'.format(role), json=params, wrap_ttl=wrap_ttl).json() else: return self._adapter.post('/v1/auth/token/create', json=params, wrap_ttl=wrap_ttl).json()
[docs] def lookup_token(self, token=None, accessor=False, wrap_ttl=None): """GET /auth/token/lookup/<token> GET /auth/token/lookup-accessor/<token-accessor> GET /auth/token/lookup-self :param token: :type token: str. :param accessor: :type accessor: str. :param wrap_ttl: :type wrap_ttl: int. :return: :rtype: """ token_param = { 'token': token, } accessor_param = { 'accessor': token, } if token: if accessor: path = '/v1/auth/token/lookup-accessor' return self._adapter.post(path, json=accessor_param, wrap_ttl=wrap_ttl).json() else: path = '/v1/auth/token/lookup' return self._adapter.post(path, json=token_param).json() else: path = '/v1/auth/token/lookup-self' return self._adapter.get(path, wrap_ttl=wrap_ttl).json()
[docs] def revoke_token(self, token, orphan=False, accessor=False): """POST /auth/token/revoke POST /auth/token/revoke-orphan POST /auth/token/revoke-accessor :param token: :type token: :param orphan: :type orphan: :param accessor: :type accessor: :return: :rtype: """ if accessor and orphan: msg = "revoke_token does not support 'orphan' and 'accessor' flags together" raise exceptions.InvalidRequest(msg) elif accessor: params = {'accessor': token} self._adapter.post('/v1/auth/token/revoke-accessor', json=params) elif orphan: params = {'token': token} self._adapter.post('/v1/auth/token/revoke-orphan', json=params) else: params = {'token': token} self._adapter.post('/v1/auth/token/revoke', json=params)
[docs] def revoke_token_prefix(self, prefix): """POST /auth/token/revoke-prefix/<prefix> :param prefix: :type prefix: :return: :rtype: """ self._adapter.post('/v1/auth/token/revoke-prefix/{0}'.format(prefix))
[docs] def renew_token(self, token=None, increment=None, wrap_ttl=None): """POST /auth/token/renew POST /auth/token/renew-self :param token: :type token: :param increment: :type increment: :param wrap_ttl: :type wrap_ttl: :return: :rtype: """ params = { 'increment': increment, } if token is not None: params['token'] = token return self._adapter.post('/v1/auth/token/renew', json=params, wrap_ttl=wrap_ttl).json() else: return self._adapter.post('/v1/auth/token/renew-self', json=params, wrap_ttl=wrap_ttl).json()
[docs] def create_token_role(self, role, allowed_policies=None, disallowed_policies=None, orphan=None, period=None, renewable=None, path_suffix=None, explicit_max_ttl=None): """POST /auth/token/roles/<role> :param role: :type role: :param allowed_policies: :type allowed_policies: :param disallowed_policies: :type disallowed_policies: :param orphan: :type orphan: :param period: :type period: :param renewable: :type renewable: :param path_suffix: :type path_suffix: :param explicit_max_ttl: :type explicit_max_ttl: :return: :rtype: """ params = { 'allowed_policies': allowed_policies, 'disallowed_policies': disallowed_policies, 'orphan': orphan, 'period': period, 'renewable': renewable, 'path_suffix': path_suffix, 'explicit_max_ttl': explicit_max_ttl } return self._adapter.post('/v1/auth/token/roles/{0}'.format(role), json=params)
[docs] def token_role(self, role): """Returns the named token role. :param role: :type role: :return: :rtype: """ return self.read('auth/token/roles/{0}'.format(role))
[docs] def delete_token_role(self, role): """Deletes the named token role. :param role: :type role: :return: :rtype: """ return self.delete('auth/token/roles/{0}'.format(role))
[docs] def list_token_roles(self): """GET /auth/token/roles?list=true :return: :rtype: """ return self.list('auth/token/roles')
[docs] def logout(self, revoke_token=False): """Clears the token used for authentication, optionally revoking it before doing so. :param revoke_token: :type revoke_token: :return: :rtype: """ if revoke_token: self.revoke_self_token() self.token = None
[docs] def is_authenticated(self): """Helper method which returns the authentication status of the client :return: :rtype: """ if not self.token: return False try: self.lookup_token() return True except exceptions.Forbidden: return False except exceptions.InvalidPath: return False except exceptions.InvalidRequest: return False
[docs] def auth_app_id(self, app_id, user_id, mount_point='app-id', use_token=True): """POST /auth/<mount point>/login :param app_id: :type app_id: :param user_id: :type user_id: :param mount_point: :type mount_point: :param use_token: :type use_token: :return: :rtype: """ params = { 'app_id': app_id, 'user_id': user_id, } return self.login('/v1/auth/{0}/login'.format(mount_point), json=params, use_token=use_token)
[docs] def auth_tls(self, mount_point='cert', use_token=True): """POST /auth/<mount point>/login :param mount_point: :type mount_point: :param use_token: :type use_token: :return: :rtype: """ return self.login('/v1/auth/{0}/login'.format(mount_point), use_token=use_token)
[docs] def auth_userpass(self, username, password, mount_point='userpass', use_token=True, **kwargs): """POST /auth/<mount point>/login/<username> :param username: :type username: :param password: :type password: :param mount_point: :type mount_point: :param use_token: :type use_token: :param kwargs: :type kwargs: :return: :rtype: """ params = { 'password': password, } params.update(kwargs) return self.login('/v1/auth/{0}/login/{1}'.format(mount_point, username), json=params, use_token=use_token)
[docs] @utils.deprecated_method( to_be_removed_in_version='0.11.2', new_method=api.auth_methods.Aws.iam_login, ) def auth_aws_iam(self, access_key, secret_key, session_token=None, header_value=None, mount_point='aws', role='', use_token=True, region='us-east-1'): """POST /auth/<mount point>/login :param access_key: AWS IAM access key ID :type access_key: str :param secret_key: AWS IAM secret access key :type secret_key: str :param session_token: Optional AWS IAM session token retrieved via a GetSessionToken AWS API request. see: https://docs.aws.amazon.com/STS/latest/APIReference/API_GetSessionToken.html :type session_token: str :param header_value: Vault allows you to require an additional header, X-Vault-AWS-IAM-Server-ID, to be present to mitigate against different types of replay attacks. Depending on the configuration of the AWS auth backend, providing a argument to this optional parameter may be required. :type header_value: str :param mount_point: The "path" the AWS auth backend was mounted on. Vault currently defaults to "aws". "aws-ec2" is the default argument for backwards comparability within this module. :type mount_point: str :param role: Name of the role against which the login is being attempted. If role is not specified, then the login endpoint looks for a role bearing the name of the AMI ID of the EC2 instance that is trying to login if using the ec2 auth method, or the "friendly name" (i.e., role name or username) of the IAM principal authenticated. If a matching role is not found, login fails. :type role: str :param use_token: If True, uses the token in the response received from the auth request to set the "token" attribute on the current Client class instance. :type use_token: bool. :return: The response from the AWS IAM login request attempt. :rtype: requests.Response """ request = aws_utils.generate_sigv4_auth_request(header_value=header_value) auth = aws_utils.SigV4Auth(access_key, secret_key, session_token, region) auth.add_auth(request) # https://github.com/hashicorp/vault/blob/master/builtin/credential/aws/cli.go headers = json.dumps({k: [request.headers[k]] for k in request.headers}) params = { 'iam_http_request_method': request.method, 'iam_request_url': b64encode(request.url.encode('utf-8')).decode('utf-8'), 'iam_request_headers': b64encode(headers.encode('utf-8')).decode('utf-8'), 'iam_request_body': b64encode(request.body.encode('utf-8')).decode('utf-8'), 'role': role, } return self.login('/v1/auth/{0}/login'.format(mount_point), json=params, use_token=use_token)
[docs] @utils.deprecated_method( to_be_removed_in_version='0.11.2', new_method=api.auth_methods.Aws.ec2_login, ) def auth_ec2(self, pkcs7, nonce=None, role=None, use_token=True, mount_point='aws-ec2'): """POST /auth/<mount point>/login :param pkcs7: PKCS#7 version of an AWS Instance Identity Document from the EC2 Metadata Service. :type pkcs7: str. :param nonce: Optional nonce returned as part of the original authentication request. Not required if the backend has "allow_instance_migration" or "disallow_reauthentication" options turned on. :type nonce: str. :param role: Identifier for the AWS auth backend role being requested. :type role: str. :param use_token: If True, uses the token in the response received from the auth request to set the "token" attribute on the current Client class instance. :type use_token: bool. :param mount_point: The "path" the AWS auth backend was mounted on. Vault currently defaults to "aws". "aws-ec2" is the default argument for backwards comparability within this module. :type mount_point: str. :return: parsed JSON response from the auth POST request :rtype: dict. """ params = {'pkcs7': pkcs7} if nonce: params['nonce'] = nonce if role: params['role'] = role return self.login('/v1/auth/{0}/login'.format(mount_point), json=params, use_token=use_token)
[docs] def create_userpass(self, username, password, policies, mount_point='userpass', **kwargs): """POST /auth/<mount point>/users/<username> :param username: :type username: :param password: :type password: :param policies: :type policies: :param mount_point: :type mount_point: :param kwargs: :type kwargs: :return: :rtype: """ # Users can have more than 1 policy. It is easier for the user to pass in the # policies as a list so if they do, we need to convert to a , delimited string. if isinstance(policies, (list, set, tuple)): policies = ','.join(policies) params = { 'password': password, 'policies': policies } params.update(kwargs) return self._adapter.post('/v1/auth/{}/users/{}'.format(mount_point, username), json=params)
[docs] def list_userpass(self, mount_point='userpass'): """GET /auth/<mount point>/users?list=true :param mount_point: :type mount_point: :return: :rtype: """ try: return self._adapter.get('/v1/auth/{}/users'.format(mount_point), params={'list': True}).json() except exceptions.InvalidPath: return None
[docs] def read_userpass(self, username, mount_point='userpass'): """GET /auth/<mount point>/users/<username> :param username: :type username: :param mount_point: :type mount_point: :return: :rtype: """ return self._adapter.get('/v1/auth/{}/users/{}'.format(mount_point, username)).json()
[docs] def update_userpass_policies(self, username, policies, mount_point='userpass'): """POST /auth/<mount point>/users/<username>/policies :param username: :type username: :param policies: :type policies: :param mount_point: :type mount_point: :return: :rtype: """ # userpass can have more than 1 policy. It is easier for the user to pass in the # policies as a list so if they do, we need to convert to a , delimited string. if isinstance(policies, (list, set, tuple)): policies = ','.join(policies) params = { 'policies': policies } return self._adapter.post('/v1/auth/{}/users/{}/policies'.format(mount_point, username), json=params)
[docs] def update_userpass_password(self, username, password, mount_point='userpass'): """POST /auth/<mount point>/users/<username>/password :param username: :type username: :param password: :type password: :param mount_point: :type mount_point: :return: :rtype: """ params = { 'password': password } return self._adapter.post('/v1/auth/{}/users/{}/password'.format(mount_point, username), json=params)
[docs] def delete_userpass(self, username, mount_point='userpass'): """DELETE /auth/<mount point>/users/<username> :param username: :type username: :param mount_point: :type mount_point: :return: :rtype: """ return self._adapter.delete('/v1/auth/{}/users/{}'.format(mount_point, username))
[docs] def create_app_id(self, app_id, policies, display_name=None, mount_point='app-id', **kwargs): """POST /auth/<mount point>/map/app-id/<app_id> :param app_id: :type app_id: :param policies: :type policies: :param display_name: :type display_name: :param mount_point: :type mount_point: :param kwargs: :type kwargs: :return: :rtype: """ # app-id can have more than 1 policy. It is easier for the user to pass in the # policies as a list so if they do, we need to convert to a , delimited string. if isinstance(policies, (list, set, tuple)): policies = ','.join(policies) params = { 'value': policies } # Only use the display_name if it has a value. Made it a named param for user # convienence instead of leaving it as part of the kwargs if display_name: params['display_name'] = display_name params.update(kwargs) return self._adapter.post('/v1/auth/{}/map/app-id/{}'.format(mount_point, app_id), json=params)
[docs] def get_app_id(self, app_id, mount_point='app-id', wrap_ttl=None): """GET /auth/<mount_point>/map/app-id/<app_id> :param app_id: :type app_id: :param mount_point: :type mount_point: :param wrap_ttl: :type wrap_ttl: :return: :rtype: """ path = '/v1/auth/{0}/map/app-id/{1}'.format(mount_point, app_id) return self._adapter.get(path, wrap_ttl=wrap_ttl).json()
[docs] def delete_app_id(self, app_id, mount_point='app-id'): """DELETE /auth/<mount_point>/map/app-id/<app_id> :param app_id: :type app_id: :param mount_point: :type mount_point: :return: :rtype: """ return self._adapter.delete('/v1/auth/{0}/map/app-id/{1}'.format(mount_point, app_id))
[docs] def create_user_id(self, user_id, app_id, cidr_block=None, mount_point='app-id', **kwargs): """POST /auth/<mount point>/map/user-id/<user_id> :param user_id: :type user_id: :param app_id: :type app_id: :param cidr_block: :type cidr_block: :param mount_point: :type mount_point: :param kwargs: :type kwargs: :return: :rtype: """ # user-id can be associated to more than 1 app-id (aka policy). It is easier for the user to # pass in the policies as a list so if they do, we need to convert to a , delimited string. if isinstance(app_id, (list, set, tuple)): app_id = ','.join(app_id) params = { 'value': app_id } # Only use the cidr_block if it has a value. Made it a named param for user # convienence instead of leaving it as part of the kwargs if cidr_block: params['cidr_block'] = cidr_block params.update(kwargs) return self._adapter.post('/v1/auth/{}/map/user-id/{}'.format(mount_point, user_id), json=params)
[docs] def get_user_id(self, user_id, mount_point='app-id', wrap_ttl=None): """GET /auth/<mount_point>/map/user-id/<user_id> :param user_id: :type user_id: :param mount_point: :type mount_point: :param wrap_ttl: :type wrap_ttl: :return: :rtype: """ path = '/v1/auth/{0}/map/user-id/{1}'.format(mount_point, user_id) return self._adapter.get(path, wrap_ttl=wrap_ttl).json()
[docs] def delete_user_id(self, user_id, mount_point='app-id'): """DELETE /auth/<mount_point>/map/user-id/<user_id> :param user_id: :type user_id: :param mount_point: :type mount_point: :return: :rtype: """ return self._adapter.delete('/v1/auth/{0}/map/user-id/{1}'.format(mount_point, user_id))
[docs] @utils.deprecated_method( to_be_removed_in_version='0.11.2', new_method=api.auth_methods.Aws.configure, ) def create_vault_ec2_client_configuration(self, access_key, secret_key, endpoint=None, mount_point='aws-ec2'): """POST /auth/<mount_point>/config/client Configure the credentials required to perform API calls to AWS as well as custom endpoints to talk to AWS APIs. The instance identity document fetched from the PKCS#7 signature will provide the EC2 instance ID. The credentials configured using this endpoint will be used to query the status of the instances via DescribeInstances API. If static credentials are not provided using this endpoint, then the credentials will be retrieved from the environment variables AWS_ACCESS_KEY, AWS_SECRET_KEY and AWS_REGION respectively. If the credentials are still not found and if the method is configured on an EC2 instance with metadata querying capabilities, the credentials are fetched automatically :param access_key: AWS Access key with permissions to query AWS APIs. The permissions required depend on the specific configurations. If using the iam auth method without inferencing, then no credentials are necessary. If using the ec2 auth method or using the iam auth method with inferencing, then these credentials need access to ec2:DescribeInstances. If additionally a bound_iam_role is specified, then these credentials also need access to iam:GetInstanceProfile. If, however, an alternate sts configuration is set for the target account, then the credentials must be permissioned to call sts:AssumeRole on the configured role, and that role must have the permissions described here. :type access_key: str|unicode :param secret_key: AWS Secret key with permissions to query AWS APIs. :type secret_key: str|unicode :param endpoint: URL to override the default generated endpoint for making AWS EC2 API calls. :type endpoint: str|unicode :param mount_point: The "path" the AWS auth backend was mounted on. Vault currently defaults to "aws". "aws-ec2" is the default argument for backwards comparability within this module. :type mount_point: str|unicode :return: The response of the request. :rtype: requests.Response """ params = { 'access_key': access_key, 'secret_key': secret_key } if endpoint is not None: params['endpoint'] = endpoint return self._adapter.post('/v1/auth/{0}/config/client'.format(mount_point), json=params)
[docs] @utils.deprecated_method( to_be_removed_in_version='0.11.2', new_method=api.auth_methods.Aws.read_config, ) def get_vault_ec2_client_configuration(self, mount_point='aws-ec2'): """GET /auth/<mount_point>/config/client :param mount_point: :type mount_point: :return: :rtype: """ return self._adapter.get('/v1/auth/{0}/config/client'.format(mount_point)).json()
[docs] @utils.deprecated_method( to_be_removed_in_version='0.11.2', new_method=api.auth_methods.Aws.delete_config, ) def delete_vault_ec2_client_configuration(self, mount_point='aws-ec2'): """DELETE /auth/<mount_point>/config/client :param mount_point: :type mount_point: :return: :rtype: """ return self._adapter.delete('/v1/auth/{0}/config/client'.format(mount_point))
[docs] @utils.deprecated_method( to_be_removed_in_version='0.11.2', new_method=api.auth_methods.Aws.create_certificate_configuration, ) def create_vault_ec2_certificate_configuration(self, cert_name, aws_public_cert, mount_point='aws-ec2'): """POST /auth/<mount_point>/config/certificate/<cert_name> :param cert_name: :type cert_name: :param aws_public_cert: :type aws_public_cert: :param mount_point: :type mount_point: :return: :rtype: """ params = { 'cert_name': cert_name, 'aws_public_cert': aws_public_cert } return self._adapter.post('/v1/auth/{0}/config/certificate/{1}'.format(mount_point, cert_name), json=params)
[docs] @utils.deprecated_method( to_be_removed_in_version='0.11.2', new_method=api.auth_methods.Aws.read_certificate_configuration, ) def get_vault_ec2_certificate_configuration(self, cert_name, mount_point='aws-ec2'): """GET /auth/<mount_point>/config/certificate/<cert_name> :param cert_name: :type cert_name: :param mount_point: :type mount_point: :return: :rtype: """ return self._adapter.get('/v1/auth/{0}/config/certificate/{1}'.format(mount_point, cert_name)).json()
[docs] @utils.deprecated_method( to_be_removed_in_version='0.11.2', new_method=api.auth_methods.Aws.list_certificate_configurations, ) def list_vault_ec2_certificate_configurations(self, mount_point='aws-ec2'): """GET /auth/<mount_point>/config/certificates?list=true :param mount_point: :type mount_point: :return: :rtype: """ params = {'list': True} return self._adapter.get('/v1/auth/{0}/config/certificates'.format(mount_point), params=params).json()
[docs] @utils.deprecated_method( to_be_removed_in_version='0.11.2', new_method=api.auth_methods.Aws.create_role, ) def create_ec2_role(self, role, bound_ami_id=None, bound_account_id=None, bound_iam_role_arn=None, bound_iam_instance_profile_arn=None, bound_ec2_instance_id=None, bound_region=None, bound_vpc_id=None, bound_subnet_id=None, role_tag=None, ttl=None, max_ttl=None, period=None, policies=None, allow_instance_migration=False, disallow_reauthentication=False, resolve_aws_unique_ids=None, mount_point='aws-ec2'): """POST /auth/<mount_point>/role/<role> :param role: :type role: :param bound_ami_id: :type bound_ami_id: :param bound_account_id: :type bound_account_id: :param bound_iam_role_arn: :type bound_iam_role_arn: :param bound_iam_instance_profile_arn: :type bound_iam_instance_profile_arn: :param bound_ec2_instance_id: :type bound_ec2_instance_id: :param bound_region: :type bound_region: :param bound_vpc_id: :type bound_vpc_id: :param bound_subnet_id: :type bound_subnet_id: :param role_tag: :type role_tag: :param ttl: :type ttl: :param max_ttl: :type max_ttl: :param period: :type period: :param policies: :type policies: :param allow_instance_migration: :type allow_instance_migration: :param disallow_reauthentication: :type disallow_reauthentication: :param resolve_aws_unique_ids: :type resolve_aws_unique_ids: :param mount_point: :type mount_point: :return: :rtype: """ params = { 'role': role, 'auth_type': 'ec2', 'disallow_reauthentication': disallow_reauthentication, 'allow_instance_migration': allow_instance_migration } if bound_ami_id is not None: params['bound_ami_id'] = bound_ami_id if bound_account_id is not None: params['bound_account_id'] = bound_account_id if bound_iam_role_arn is not None: params['bound_iam_role_arn'] = bound_iam_role_arn if bound_ec2_instance_id is not None: params['bound_iam_instance_profile_arn'] = bound_ec2_instance_id if bound_iam_instance_profile_arn is not None: params['bound_iam_instance_profile_arn'] = bound_iam_instance_profile_arn if bound_region is not None: params['bound_region'] = bound_region if bound_vpc_id is not None: params['bound_vpc_id'] = bound_vpc_id if bound_subnet_id is not None: params['bound_subnet_id'] = bound_subnet_id if role_tag is not None: params['role_tag'] = role_tag if ttl is not None: params['ttl'] = ttl else: params['ttl'] = 0 if max_ttl is not None: params['max_ttl'] = max_ttl else: params['max_ttl'] = 0 if period is not None: params['period'] = period else: params['period'] = 0 if policies is not None: params['policies'] = policies if resolve_aws_unique_ids is not None: params['resolve_aws_unique_ids'] = resolve_aws_unique_ids return self._adapter.post('/v1/auth/{0}/role/{1}'.format(mount_point, role), json=params)
[docs] @utils.deprecated_method( to_be_removed_in_version='0.11.2', new_method=api.auth_methods.Aws.read_role, ) def get_ec2_role(self, role, mount_point='aws-ec2'): """GET /auth/<mount_point>/role/<role> :param role: :type role: :param mount_point: :type mount_point: :return: :rtype: """ return self._adapter.get('/v1/auth/{0}/role/{1}'.format(mount_point, role)).json()
[docs] @utils.deprecated_method( to_be_removed_in_version='0.11.2', new_method=api.auth_methods.Aws.delete_role, ) def delete_ec2_role(self, role, mount_point='aws-ec2'): """DELETE /auth/<mount_point>/role/<role> :param role: :type role: :param mount_point: :type mount_point: :return: :rtype: """ return self._adapter.delete('/v1/auth/{0}/role/{1}'.format(mount_point, role))
[docs] @utils.deprecated_method( to_be_removed_in_version='0.11.2', new_method=api.auth_methods.Aws.list_roles, ) def list_ec2_roles(self, mount_point='aws-ec2'): """GET /auth/<mount_point>/roles?list=true :param mount_point: :type mount_point: :return: :rtype: """ try: return self._adapter.get('/v1/auth/{0}/roles'.format(mount_point), params={'list': True}).json() except exceptions.InvalidPath: return None
[docs] @utils.deprecated_method( to_be_removed_in_version='0.11.2', new_method=api.auth_methods.Aws.create_role_tags, ) def create_ec2_role_tag(self, role, policies=None, max_ttl=None, instance_id=None, disallow_reauthentication=False, allow_instance_migration=False, mount_point='aws-ec2'): """POST /auth/<mount_point>/role/<role>/tag :param role: :type role: :param policies: :type policies: :param max_ttl: :type max_ttl: :param instance_id: :type instance_id: :param disallow_reauthentication: :type disallow_reauthentication: :param allow_instance_migration: :type allow_instance_migration: :param mount_point: :type mount_point: :return: :rtype: """ params = { 'role': role, 'disallow_reauthentication': disallow_reauthentication, 'allow_instance_migration': allow_instance_migration } if max_ttl is not None: params['max_ttl'] = max_ttl if policies is not None: params['policies'] = policies if instance_id is not None: params['instance_id'] = instance_id return self._adapter.post('/v1/auth/{0}/role/{1}/tag'.format(mount_point, role), json=params)
[docs] def auth_cubbyhole(self, token): """Perform a login request with a wrapped token. Stores the unwrapped token in the resulting Vault response for use by the :py:meth:`hvac.adapters.Adapter` instance under the _adapater Client attribute. :param token: Wrapped token :type token: str | unicode :return: The (JSON decoded) response of the auth request :rtype: dict """ self.token = token return self.login('/v1/sys/wrapping/unwrap')
[docs] def login(self, url, use_token=True, **kwargs): """Perform a login request. Associated request is typically to a path prefixed with "/v1/auth") and optionally stores the client token sent in the resulting Vault response for use by the :py:meth:`hvac.adapters.Adapter` instance under the _adapater Client attribute. :param url: Path to send the authentication request to. :type url: str | unicode :param use_token: if True, uses the token in the response received from the auth request to set the "token" attribute on the the :py:meth:`hvac.adapters.Adapter` instance under the _adapater Client attribute. :type use_token: bool :param kwargs: Additional keyword arguments to include in the params sent with the request. :type kwargs: dict :return: The response of the auth request. :rtype: requests.Response """ return self._adapter.login( url=url, use_token=use_token, **kwargs )
[docs] def create_role(self, role_name, mount_point='approle', **kwargs): """POST /auth/<mount_point>/role/<role name> :param role_name: :type role_name: :param mount_point: :type mount_point: :param kwargs: :type kwargs: :return: :rtype: """ return self._adapter.post('/v1/auth/{0}/role/{1}'.format(mount_point, role_name), json=kwargs)
[docs] def delete_role(self, role_name, mount_point='approle'): """DELETE /auth/<mount_point>/role/<role name> :param role_name: :type role_name: :param mount_point: :type mount_point: :return: :rtype: """ return self._adapter.delete('/v1/auth/{0}/role/{1}'.format(mount_point, role_name))
[docs] def list_roles(self, mount_point='approle'): """GET /auth/<mount_point>/role :param mount_point: :type mount_point: :return: :rtype: """ return self._adapter.get('/v1/auth/{0}/role?list=true'.format(mount_point)).json()
[docs] def get_role_id(self, role_name, mount_point='approle'): """GET /auth/<mount_point>/role/<role name>/role-id :param role_name: :type role_name: :param mount_point: :type mount_point: :return: :rtype: """ url = '/v1/auth/{0}/role/{1}/role-id'.format(mount_point, role_name) return self._adapter.get(url).json()['data']['role_id']
[docs] def set_role_id(self, role_name, role_id, mount_point='approle'): """POST /auth/<mount_point>/role/<role name>/role-id :param role_name: :type role_name: :param role_id: :type role_id: :param mount_point: :type mount_point: :return: :rtype: """ url = '/v1/auth/{0}/role/{1}/role-id'.format(mount_point, role_name) params = { 'role_id': role_id } return self._adapter.post(url, json=params)
[docs] def get_role(self, role_name, mount_point='approle'): """GET /auth/<mount_point>/role/<role name> :param role_name: :type role_name: :param mount_point: :type mount_point: :return: :rtype: """ return self._adapter.get('/v1/auth/{0}/role/{1}'.format(mount_point, role_name)).json()
[docs] def create_role_secret_id(self, role_name, meta=None, cidr_list=None, wrap_ttl=None, mount_point='approle'): """POST /auth/<mount_point>/role/<role name>/secret-id :param role_name: :type role_name: :param meta: :type meta: :param cidr_list: :type cidr_list: :param wrap_ttl: :type wrap_ttl: :param mount_point: :type mount_point: :return: :rtype: """ url = '/v1/auth/{0}/role/{1}/secret-id'.format(mount_point, role_name) params = {} if meta is not None: params['metadata'] = json.dumps(meta) if cidr_list is not None: params['cidr_list'] = cidr_list return self._adapter.post(url, json=params, wrap_ttl=wrap_ttl).json()
[docs] def get_role_secret_id(self, role_name, secret_id, mount_point='approle'): """POST /auth/<mount_point>/role/<role name>/secret-id/lookup :param role_name: :type role_name: :param secret_id: :type secret_id: :param mount_point: :type mount_point: :return: :rtype: """ url = '/v1/auth/{0}/role/{1}/secret-id/lookup'.format(mount_point, role_name) params = { 'secret_id': secret_id } return self._adapter.post(url, json=params).json()
[docs] def list_role_secrets(self, role_name, mount_point='approle'): """LIST /auth/<mount_point>/role/<role name>/secret-id :param role_name: Name of the AppRole. :type role_name: str|unicode :param mount_point: The "path" the AppRole auth backend was mounted on. Vault currently defaults to "approle". :type mount_point: str|unicode :return: The JSON response of the request. :rtype: dict """ url = '/v1/auth/{mount_point}/role/{name}/secret-id'.format( mount_point=mount_point, name=role_name ) return self._adapter.list(url).json()
[docs] def get_role_secret_id_accessor(self, role_name, secret_id_accessor, mount_point='approle'): """POST /auth/<mount_point>/role/<role name>/secret-id-accessor/lookup :param role_name: :type role_name: :param secret_id_accessor: :type secret_id_accessor: :param mount_point: :type mount_point: :return: :rtype: """ url = '/v1/auth/{0}/role/{1}/secret-id-accessor/lookup'.format(mount_point, role_name) params = {'secret_id_accessor': secret_id_accessor} return self._adapter.post(url, json=params).json()
[docs] def delete_role_secret_id(self, role_name, secret_id, mount_point='approle'): """POST /auth/<mount_point>/role/<role name>/secret-id/destroy :param role_name: :type role_name: :param secret_id: :type secret_id: :param mount_point: :type mount_point: :return: :rtype: """ url = '/v1/auth/{0}/role/{1}/secret-id/destroy'.format(mount_point, role_name) params = { 'secret_id': secret_id } return self._adapter.post(url, json=params)
[docs] def delete_role_secret_id_accessor(self, role_name, secret_id_accessor, mount_point='approle'): """POST /auth/<mount_point>/role/<role name>/secret-id-accessor/destroy :param role_name: :type role_name: :param secret_id_accessor: :type secret_id_accessor: :param mount_point: :type mount_point: :return: :rtype: """ url = '/v1/auth/{0}/role/{1}/secret-id-accessor/destroy'.format(mount_point, role_name) params = { 'secret_id_accessor': secret_id_accessor } return self._adapter.post(url, json=params)
[docs] def create_role_custom_secret_id(self, role_name, secret_id, meta=None, mount_point='approle'): """POST /auth/<mount_point>/role/<role name>/custom-secret-id :param role_name: :type role_name: :param secret_id: :type secret_id: :param meta: :type meta: :param mount_point: :type mount_point: :return: :rtype: """ url = '/v1/auth/{0}/role/{1}/custom-secret-id'.format(mount_point, role_name) params = { 'secret_id': secret_id } if meta is not None: params['meta'] = meta return self._adapter.post(url, json=params).json()
[docs] def auth_approle(self, role_id, secret_id=None, mount_point='approle', use_token=True): """POST /auth/<mount_point>/login :param role_id: :type role_id: :param secret_id: :type secret_id: :param mount_point: :type mount_point: :param use_token: :type use_token: :return: :rtype: """ params = { 'role_id': role_id } if secret_id is not None: params['secret_id'] = secret_id return self.login('/v1/auth/{0}/login'.format(mount_point), json=params, use_token=use_token)
[docs] def create_kubernetes_configuration(self, kubernetes_host, kubernetes_ca_cert=None, token_reviewer_jwt=None, pem_keys=None, mount_point='kubernetes'): """POST /auth/<mount_point>/config :param kubernetes_host: A host:port pair, or a URL to the base of the Kubernetes API server. :type kubernetes_host: str. :param kubernetes_ca_cert: PEM encoded CA cert for use by the TLS client used to talk with the Kubernetes API. :type kubernetes_ca_cert: str. :param token_reviewer_jwt: A service account JWT used to access the TokenReview API to validate other JWTs during login. If not set the JWT used for login will be used to access the API. :type token_reviewer_jwt: str. :param pem_keys: Optional list of PEM-formated public keys or certificates used to verify the signatures of Kubernetes service account JWTs. If a certificate is given, its public key will be extracted. Not every installation of Kubernetes exposes these keys. :type pem_keys: list. :param mount_point: The "path" the k8s auth backend was mounted on. Vault currently defaults to "kubernetes". :type mount_point: str. :return: Will be an empty body with a 204 status code upon success :rtype: requests.Response. """ params = { 'kubernetes_host': kubernetes_host, 'kubernetes_ca_cert': kubernetes_ca_cert, } if token_reviewer_jwt is not None: params['token_reviewer_jwt'] = token_reviewer_jwt if pem_keys is not None: params['pem_keys'] = pem_keys url = 'v1/auth/{0}/config'.format(mount_point) return self._adapter.post(url, json=params)
[docs] def get_kubernetes_configuration(self, mount_point='kubernetes'): """GET /auth/<mount_point>/config :param mount_point: The "path" the k8s auth backend was mounted on. Vault currently defaults to "kubernetes". :type mount_point: str. :return: Parsed JSON response from the config GET request :rtype: dict. """ url = '/v1/auth/{0}/config'.format(mount_point) return self._adapter.get(url).json()
[docs] def create_kubernetes_role(self, name, bound_service_account_names, bound_service_account_namespaces, ttl="", max_ttl="", period="", policies=None, mount_point='kubernetes'): """POST /auth/<mount_point>/role/:name :param name: Name of the role. :type name: str. :param bound_service_account_names: List of service account names able to access this role. If set to "*" all names are allowed, both this and bound_service_account_namespaces can not be "*". :type bound_service_account_names: list. :param bound_service_account_namespaces: List of namespaces allowed to access this role. If set to "*" all namespaces are allowed, both this and bound_service_account_names can not be set to "*". :type bound_service_account_namespaces: list. :param ttl: The TTL period of tokens issued using this role in seconds. :type ttl: str. :param max_ttl: The maximum allowed lifetime of tokens issued in seconds using this role. :type max_ttl: str. :param period: If set, indicates that the token generated using this role should never expire. The token should be renewed within the duration specified by this value. At each renewal, the token's TTL will be set to the value of this parameter. :type period: str. :param policies: Policies to be set on tokens issued using this role :type policies: list. :param mount_point: The "path" the k8s auth backend was mounted on. Vault currently defaults to "kubernetes". :type mount_point: str. :return: Will be an empty body with a 204 status code upon success :rtype: requests.Response. """ if bound_service_account_names == '*' and bound_service_account_namespaces == '*': error_message = 'bound_service_account_names and bound_service_account_namespaces can not both be set to "*"' raise exceptions.ParamValidationError(error_message) params = { 'bound_service_account_names': bound_service_account_names, 'bound_service_account_namespaces': bound_service_account_namespaces, 'ttl': ttl, 'max_ttl': max_ttl, 'period': period, 'policies': policies, } url = 'v1/auth/{0}/role/{1}'.format(mount_point, name) return self._adapter.post(url, json=params)
[docs] def get_kubernetes_role(self, name, mount_point='kubernetes'): """GET /auth/<mount_point>/role/:name :param name: Name of the role. :type name: str. :param mount_point: The "path" the k8s auth backend was mounted on. Vault currently defaults to "kubernetes". :type mount_point: str. :return: Parsed JSON response from the read role GET request :rtype: dict. """ url = 'v1/auth/{0}/role/{1}'.format(mount_point, name) return self._adapter.get(url).json()
[docs] def list_kubernetes_roles(self, mount_point='kubernetes'): """GET /auth/<mount_point>/role?list=true :param mount_point: The "path" the k8s auth backend was mounted on. Vault currently defaults to "kubernetes". :type mount_point: str. :return: Parsed JSON response from the list roles GET request. :rtype: dict. """ url = 'v1/auth/{0}/role?list=true'.format(mount_point) return self._adapter.get(url).json()
[docs] def delete_kubernetes_role(self, role, mount_point='kubernetes'): """DELETE /auth/<mount_point>/role/:role :type role: Name of the role. :param role: str. :param mount_point: The "path" the k8s auth backend was mounted on. Vault currently defaults to "kubernetes". :type mount_point: str. :return: Will be an empty body with a 204 status code upon success. :rtype: requests.Response. """ url = 'v1/auth/{0}/role/{1}'.format(mount_point, role) return self._adapter.delete(url)
[docs] def auth_kubernetes(self, role, jwt, use_token=True, mount_point='kubernetes'): """POST /auth/<mount_point>/login :param role: Name of the role against which the login is being attempted. :type role: str. :param jwt: Signed JSON Web Token (JWT) for authenticating a service account. :type jwt: str. :param use_token: if True, uses the token in the response received from the auth request to set the "token" attribute on the current Client class instance. :type use_token: bool. :param mount_point: The "path" the k8s auth backend was mounted on. Vault currently defaults to "kubernetes". :type mount_point: str. :return: Parsed JSON response from the config POST request. :rtype: dict. """ params = { 'role': role, 'jwt': jwt } url = 'v1/auth/{0}/login'.format(mount_point) return self.login(url, json=params, use_token=use_token)
[docs] @utils.deprecated_method( to_be_removed_in_version='0.9.0', new_method=api.secrets_engines.Transit.create_key, ) def transit_create_key(self, name, convergent_encryption=None, derived=None, exportable=None, key_type=None, mount_point='transit'): """POST /<mount_point>/keys/<name> :param name: :type name: :param convergent_encryption: :type convergent_encryption: :param derived: :type derived: :param exportable: :type exportable: :param key_type: :type key_type: :param mount_point: :type mount_point: :return: :rtype: """ url = '/v1/{0}/keys/{1}'.format(mount_point, name) params = {} if convergent_encryption is not None: params['convergent_encryption'] = convergent_encryption if derived is not None: params['derived'] = derived if exportable is not None: params['exportable'] = exportable if key_type is not None: params['type'] = key_type return self._adapter.post(url, json=params)
[docs] @utils.deprecated_method( to_be_removed_in_version='0.9.0', new_method=api.secrets_engines.Transit.read_key, ) def transit_read_key(self, name, mount_point='transit'): """GET /<mount_point>/keys/<name> :param name: :type name: :param mount_point: :type mount_point: :return: :rtype: """ url = '/v1/{0}/keys/{1}'.format(mount_point, name) return self._adapter.get(url).json()
[docs] @utils.deprecated_method( to_be_removed_in_version='0.9.0', new_method=api.secrets_engines.Transit.list_keys, ) def transit_list_keys(self, mount_point='transit'): """GET /<mount_point>/keys?list=true :param mount_point: :type mount_point: :return: :rtype: """ url = '/v1/{0}/keys?list=true'.format(mount_point) return self._adapter.get(url).json()
[docs] @utils.deprecated_method( to_be_removed_in_version='0.9.0', new_method=api.secrets_engines.Transit.delete_key, ) def transit_delete_key(self, name, mount_point='transit'): """DELETE /<mount_point>/keys/<name> :param name: :type name: :param mount_point: :type mount_point: :return: :rtype: """ url = '/v1/{0}/keys/{1}'.format(mount_point, name) return self._adapter.delete(url)
[docs] @utils.deprecated_method( to_be_removed_in_version='0.9.0', new_method=api.secrets_engines.Transit.update_key_configuration, ) def transit_update_key(self, name, min_decryption_version=None, min_encryption_version=None, deletion_allowed=None, mount_point='transit'): """POST /<mount_point>/keys/<name>/config :param name: :type name: :param min_decryption_version: :type min_decryption_version: :param min_encryption_version: :type min_encryption_version: :param deletion_allowed: :type deletion_allowed: :param mount_point: :type mount_point: :return: :rtype: """ url = '/v1/{0}/keys/{1}/config'.format(mount_point, name) params = {} if min_decryption_version is not None: params['min_decryption_version'] = min_decryption_version if min_encryption_version is not None: params['min_encryption_version'] = min_encryption_version if deletion_allowed is not None: params['deletion_allowed'] = deletion_allowed return self._adapter.post(url, json=params)
[docs] @utils.deprecated_method( to_be_removed_in_version='0.9.0', new_method=api.secrets_engines.Transit.rotate_key, ) def transit_rotate_key(self, name, mount_point='transit'): """POST /<mount_point>/keys/<name>/rotate :param name: :type name: :param mount_point: :type mount_point: :return: :rtype: """ url = '/v1/{0}/keys/{1}/rotate'.format(mount_point, name) return self._adapter.post(url)
[docs] @utils.deprecated_method( to_be_removed_in_version='0.9.0', new_method=api.secrets_engines.Transit.export_key, ) def transit_export_key(self, name, key_type, version=None, mount_point='transit'): """GET /<mount_point>/export/<key_type>/<name>(/<version>) :param name: :type name: :param key_type: :type key_type: :param version: :type version: :param mount_point: :type mount_point: :return: :rtype: """ if version is not None: url = '/v1/{0}/export/{1}/{2}/{3}'.format(mount_point, key_type, name, version) else: url = '/v1/{0}/export/{1}/{2}'.format(mount_point, key_type, name) return self._adapter.get(url).json()
[docs] @utils.deprecated_method( to_be_removed_in_version='0.9.0', new_method=api.secrets_engines.Transit.encrypt_data, ) def transit_encrypt_data(self, name, plaintext, context=None, key_version=None, nonce=None, batch_input=None, key_type=None, convergent_encryption=None, mount_point='transit'): """POST /<mount_point>/encrypt/<name> :param name: :type name: :param plaintext: :type plaintext: :param context: :type context: :param key_version: :type key_version: :param nonce: :type nonce: :param batch_input: :type batch_input: :param key_type: :type key_type: :param convergent_encryption: :type convergent_encryption: :param mount_point: :type mount_point: :return: :rtype: """ url = '/v1/{0}/encrypt/{1}'.format(mount_point, name) params = { 'plaintext': plaintext } if context is not None: params['context'] = context if key_version is not None: params['key_version'] = key_version if nonce is not None: params['nonce'] = nonce if batch_input is not None: params['batch_input'] = batch_input if key_type is not None: params['type'] = key_type if convergent_encryption is not None: params['convergent_encryption'] = convergent_encryption return self._adapter.post(url, json=params).json()
[docs] @utils.deprecated_method( to_be_removed_in_version='0.9.0', new_method=api.secrets_engines.Transit.decrypt_data, ) def transit_decrypt_data(self, name, ciphertext, context=None, nonce=None, batch_input=None, mount_point='transit'): """POST /<mount_point>/decrypt/<name> :param name: :type name: :param ciphertext: :type ciphertext: :param context: :type context: :param nonce: :type nonce: :param batch_input: :type batch_input: :param mount_point: :type mount_point: :return: :rtype: """ url = '/v1/{0}/decrypt/{1}'.format(mount_point, name) params = { 'ciphertext': ciphertext } if context is not None: params['context'] = context if nonce is not None: params['nonce'] = nonce if batch_input is not None: params['batch_input'] = batch_input return self._adapter.post(url, json=params).json()
[docs] @utils.deprecated_method( to_be_removed_in_version='0.9.0', new_method=api.secrets_engines.Transit.rewrap_data, ) def transit_rewrap_data(self, name, ciphertext, context=None, key_version=None, nonce=None, batch_input=None, mount_point='transit'): """POST /<mount_point>/rewrap/<name> :param name: :type name: :param ciphertext: :type ciphertext: :param context: :type context: :param key_version: :type key_version: :param nonce: :type nonce: :param batch_input: :type batch_input: :param mount_point: :type mount_point: :return: :rtype: """ url = '/v1/{0}/rewrap/{1}'.format(mount_point, name) params = { 'ciphertext': ciphertext } if context is not None: params['context'] = context if key_version is not None: params['key_version'] = key_version if nonce is not None: params['nonce'] = nonce if batch_input is not None: params['batch_input'] = batch_input return self._adapter.post(url, json=params).json()
[docs] @utils.deprecated_method( to_be_removed_in_version='0.9.0', new_method=api.secrets_engines.Transit.generate_data_key, ) def transit_generate_data_key(self, name, key_type, context=None, nonce=None, bits=None, mount_point='transit'): """POST /<mount_point>/datakey/<type>/<name> :param name: :type name: :param key_type: :type key_type: :param context: :type context: :param nonce: :type nonce: :param bits: :type bits: :param mount_point: :type mount_point: :return: :rtype: """ url = '/v1/{0}/datakey/{1}/{2}'.format(mount_point, key_type, name) params = {} if context is not None: params['context'] = context if nonce is not None: params['nonce'] = nonce if bits is not None: params['bits'] = bits return self._adapter.post(url, json=params).json()
[docs] @utils.deprecated_method( to_be_removed_in_version='0.9.0', new_method=api.secrets_engines.Transit.generate_random_bytes, ) def transit_generate_rand_bytes(self, data_bytes=None, output_format=None, mount_point='transit'): """POST /<mount_point>/random(/<data_bytes>) :param data_bytes: :type data_bytes: :param output_format: :type output_format: :param mount_point: :type mount_point: :return: :rtype: """ if data_bytes is not None: url = '/v1/{0}/random/{1}'.format(mount_point, data_bytes) else: url = '/v1/{0}/random'.format(mount_point) params = {} if output_format is not None: params["format"] = output_format return self._adapter.post(url, json=params).json()
[docs] @utils.deprecated_method( to_be_removed_in_version='0.9.0', new_method=api.secrets_engines.Transit.hash_data, ) def transit_hash_data(self, hash_input, algorithm=None, output_format=None, mount_point='transit'): """POST /<mount_point>/hash(/<algorithm>) :param hash_input: :type hash_input: :param algorithm: :type algorithm: :param output_format: :type output_format: :param mount_point: :type mount_point: :return: :rtype: """ if algorithm is not None: url = '/v1/{0}/hash/{1}'.format(mount_point, algorithm) else: url = '/v1/{0}/hash'.format(mount_point) params = { 'input': hash_input } if output_format is not None: params['format'] = output_format return self._adapter.post(url, json=params).json()
[docs] @utils.deprecated_method( to_be_removed_in_version='0.9.0', new_method=api.secrets_engines.Transit.generate_hmac, ) def transit_generate_hmac(self, name, hmac_input, key_version=None, algorithm=None, mount_point='transit'): """POST /<mount_point>/hmac/<name>(/<algorithm>) :param name: :type name: :param hmac_input: :type hmac_input: :param key_version: :type key_version: :param algorithm: :type algorithm: :param mount_point: :type mount_point: :return: :rtype: """ if algorithm is not None: url = '/v1/{0}/hmac/{1}/{2}'.format(mount_point, name, algorithm) else: url = '/v1/{0}/hmac/{1}'.format(mount_point, name) params = { 'input': hmac_input } if key_version is not None: params['key_version'] = key_version return self._adapter.post(url, json=params).json()
[docs] @utils.deprecated_method( to_be_removed_in_version='0.9.0', new_method=api.secrets_engines.Transit.sign_data, ) def transit_sign_data(self, name, input_data, key_version=None, algorithm=None, context=None, prehashed=None, mount_point='transit', signature_algorithm='pss'): """POST /<mount_point>/sign/<name>(/<algorithm>) :param name: :type name: :param input_data: :type input_data: :param key_version: :type key_version: :param algorithm: :type algorithm: :param context: :type context: :param prehashed: :type prehashed: :param mount_point: :type mount_point: :param signature_algorithm: :type signature_algorithm: :return: :rtype: """ if algorithm is not None: url = '/v1/{0}/sign/{1}/{2}'.format(mount_point, name, algorithm) else: url = '/v1/{0}/sign/{1}'.format(mount_point, name) params = { 'input': input_data } if key_version is not None: params['key_version'] = key_version if context is not None: params['context'] = context if prehashed is not None: params['prehashed'] = prehashed params['signature_algorithm'] = signature_algorithm return self._adapter.post(url, json=params).json()
[docs] @utils.deprecated_method( to_be_removed_in_version='0.9.0', new_method=api.secrets_engines.Transit.verify_signed_data, ) def transit_verify_signed_data(self, name, input_data, algorithm=None, signature=None, hmac=None, context=None, prehashed=None, mount_point='transit', signature_algorithm='pss'): """POST /<mount_point>/verify/<name>(/<algorithm>) :param name: :type name: :param input_data: :type input_data: :param algorithm: :type algorithm: :param signature: :type signature: :param hmac: :type hmac: :param context: :type context: :param prehashed: :type prehashed: :param mount_point: :type mount_point: :param signature_algorithm: :type signature_algorithm: :return: :rtype: """ if algorithm is not None: url = '/v1/{0}/verify/{1}/{2}'.format(mount_point, name, algorithm) else: url = '/v1/{0}/verify/{1}'.format(mount_point, name) params = { 'input': input_data } if signature is not None: params['signature'] = signature if hmac is not None: params['hmac'] = hmac if context is not None: params['context'] = context if prehashed is not None: params['prehashed'] = prehashed params['signature_algorithm'] = signature_algorithm return self._adapter.post(url, json=params).json()
[docs] @utils.deprecated_method( to_be_removed_in_version='0.9.0', new_method=api.SystemBackend.unwrap, ) def unwrap(self, token=None): return self.sys.unwrap(token=token)
[docs] @utils.deprecated_method( to_be_removed_in_version='0.9.0', new_method=api.SystemBackend.list_policies, ) def list_policies(self): policies = self.sys.list_policies()['data']['policies'] return policies
[docs] @utils.deprecated_method( to_be_removed_in_version='0.9.0', new_method=api.SystemBackend.create_or_update_policy, ) def set_policy(self, name, rules): """Add a new or update an existing policy. Once a policy is updated, it takes effect immediately to all associated users. Supported methods: PUT: /sys/policy/{name}. Produces: 204 (empty body) :param name: Specifies the name of the policy to create. :type name: str | unicode :param policy: Specifies the policy document. :type policy: str | unicode | dict """ if isinstance(rules, dict): rules = json.dumps(rules) params = { 'rules': rules, } api_path = '/v1/sys/policy/{name}'.format( name=name, ) self._adapter.put(api_path, json=params)
[docs] @utils.deprecated_method( to_be_removed_in_version='0.9.0', new_method=api.SystemBackend.delete_policy, ) def delete_policy(self, name): self.sys.delete_policy(name=name)
[docs] @utils.deprecated_method( to_be_removed_in_version='0.9.0', new_method=api.SystemBackend.is_sealed, ) def is_sealed(self): return self.sys.is_sealed()
[docs] @utils.deprecated_method( to_be_removed_in_version='0.9.0', new_method=api.SystemBackend.seal, ) def seal(self): self.sys.seal()
[docs] @utils.deprecated_method( to_be_removed_in_version='0.9.0', new_method=api.SystemBackend.submit_unseal_key, ) def unseal_reset(self): return self.sys.submit_unseal_key(reset=True)
[docs] @utils.deprecated_method( to_be_removed_in_version='0.9.0', new_method=api.SystemBackend.submit_unseal_key, ) def unseal(self, key): return self.sys.submit_unseal_key(key=key)
[docs] @utils.deprecated_method( to_be_removed_in_version='0.9.0', new_method=api.SystemBackend.submit_unseal_keys, ) def unseal_multi(self, keys): return self.sys.submit_unseal_keys(keys=keys)
[docs] @utils.deprecated_method( to_be_removed_in_version='0.9.0', new_method=api.SystemBackend.list_mounted_secrets_engines, ) def list_secret_backends(self): return self.sys.list_mounted_secrets_engines()
[docs] @utils.deprecated_method( to_be_removed_in_version='0.9.0', new_method=api.SystemBackend.enable_secrets_engine, ) def enable_secret_backend(self, backend_type, description=None, mount_point=None, config=None, options=None): return self.sys.enable_secrets_engine( backend_type=backend_type, path=mount_point, description=description, config=config, options=options, )
[docs] @utils.deprecated_method( to_be_removed_in_version='0.9.0', new_method=api.SystemBackend.tune_mount_configuration, ) def tune_secret_backend(self, backend_type, mount_point=None, default_lease_ttl=None, max_lease_ttl=None, description=None, audit_non_hmac_request_keys=None, audit_non_hmac_response_keys=None, listing_visibility=None, passthrough_request_headers=None): if not mount_point: mount_point = backend_type return self.sys.tune_mount_configuration( path=mount_point, default_lease_ttl=default_lease_ttl, max_lease_ttl=max_lease_ttl, description=description, audit_non_hmac_request_keys=audit_non_hmac_request_keys, audit_non_hmac_response_keys=audit_non_hmac_response_keys, listing_visibility=listing_visibility, passthrough_request_headers=passthrough_request_headers, )
[docs] @utils.deprecated_method( to_be_removed_in_version='0.9.0', new_method=api.SystemBackend.read_mount_configuration, ) def get_secret_backend_tuning(self, backend_type, mount_point=None): """GET /sys/mounts/<mount point>/tune :param backend_type: Name of the secret engine. E.g. "aws". :type backend_type: str | unicode :param mount_point: Alternate argument for backend_type. :type mount_point: str | unicode :return: The specified mount's configuration. :rtype: dict """ if not mount_point: mount_point = backend_type return self.sys.read_mount_configuration( path=mount_point, )
[docs] @utils.deprecated_method( to_be_removed_in_version='0.9.0', new_method=api.SystemBackend.disable_secrets_engine, ) def disable_secret_backend(self, mount_point): return self.sys.disable_secrets_engine( path=mount_point, )
[docs] @utils.deprecated_method( to_be_removed_in_version='0.9.0', new_method=api.SystemBackend.move_backend, ) def remount_secret_backend(self, from_mount_point, to_mount_point): return self.sys.move_backend( from_path=from_mount_point, to_path=to_mount_point, )
[docs] @utils.deprecated_method( to_be_removed_in_version='0.9.0', new_method=api.SystemBackend.read_lease, ) def read_lease(self, lease_id): return self.sys.read_lease( lease_id=lease_id, )
[docs] @utils.deprecated_method( to_be_removed_in_version='0.9.0', new_method=api.SystemBackend.renew_lease, ) def renew_secret(self, lease_id, increment=None): return self.sys.renew_lease( lease_id=lease_id, increment=increment, )
[docs] @utils.deprecated_method( to_be_removed_in_version='0.9.0', new_method=api.SystemBackend.revoke_lease, ) def revoke_secret(self, lease_id): return self.sys.revoke_lease( lease_id=lease_id, )
[docs] @utils.deprecated_method( to_be_removed_in_version='0.9.0', new_method=api.SystemBackend.revoke_lease, ) def revoke_secret_prefix(self, path_prefix): self.sys.revoke_prefix( prefix=path_prefix, )
[docs] @utils.deprecated_method( to_be_removed_in_version='0.9.0', new_method=api.SystemBackend.rotate_encryption_key, ) def rotate(self): self.sys.rotate_encryption_key()
[docs] @utils.deprecated_method( to_be_removed_in_version='0.9.0', new_method=api.SystemBackend.start_rekey, ) def start_rekey(self, secret_shares=5, secret_threshold=3, pgp_keys=None, backup=False): return self.sys.start_rekey( secret_shares=secret_shares, secret_threshold=secret_threshold, pgp_keys=pgp_keys, backup=backup, )
[docs] @utils.deprecated_method( to_be_removed_in_version='0.9.0', new_method=api.SystemBackend.cancel_rekey, ) def cancel_rekey(self): return self.sys.cancel_rekey()
[docs] @utils.deprecated_method( to_be_removed_in_version='0.9.0', new_method=api.SystemBackend.rekey, ) def rekey(self, key, nonce=None): self.sys.rekey( key=key, nonce=nonce, )
[docs] @utils.deprecated_method( to_be_removed_in_version='0.9.0', new_method=api.SystemBackend.rekey_multi, ) def rekey_multi(self, keys, nonce=None): return self.sys.rekey_multi( keys=keys, nonce=nonce, )
[docs] @utils.deprecated_method( to_be_removed_in_version='0.9.0', new_method=api.SystemBackend.read_backup_keys, ) def get_backed_up_keys(self): return self.sys.read_backup_keys()
[docs] @utils.deprecated_method( to_be_removed_in_version='0.9.0', new_method=api.SystemBackend.is_initialized, ) def is_initialized(self): return self.sys.is_initialized()
[docs] @utils.deprecated_method( to_be_removed_in_version='0.9.0', new_method=api.SystemBackend.initialize, ) def initialize(self, secret_shares=5, secret_threshold=3, pgp_keys=None): return self.sys.initialize( secret_shares=secret_shares, secret_threshold=secret_threshold, pgp_keys=pgp_keys, )
[docs] @utils.deprecated_method( to_be_removed_in_version='0.9.0', new_method=api.SystemBackend.start_root_token_generation, ) def start_generate_root(self, key, otp=False): params = {} if otp: params['otp'] = key else: params['pgp_key'] = key return self.sys.start_root_token_generation(**params)
[docs] @utils.deprecated_method( to_be_removed_in_version='0.9.0', new_method=api.SystemBackend.generate_root, ) def generate_root(self, key, nonce): return self.sys.generate_root( key=key, nonce=nonce, )
[docs] @utils.deprecated_method( to_be_removed_in_version='0.9.0', new_method=api.SystemBackend.cancel_root_generation, ) def cancel_generate_root(self): return self.sys.cancel_root_generation().status_code == 204
[docs] @utils.deprecated_method( to_be_removed_in_version='0.9.0', new_method=api.SystemBackend.list_auth_methods, ) def list_auth_backends(self): return self.sys.list_auth_methods()
[docs] @utils.deprecated_method( to_be_removed_in_version='0.9.0', new_method=api.SystemBackend.enable_auth_method, ) def enable_auth_backend(self, backend_type, description=None, mount_point=None, config=None, plugin_name=None): return self.sys.enable_auth_method( method_type=backend_type, description=description, config=config, path=mount_point, plugin_name=plugin_name, )
[docs] @utils.deprecated_method( to_be_removed_in_version='0.9.0', new_method=api.SystemBackend.tune_auth_method, ) def tune_auth_backend(self, backend_type, mount_point=None, default_lease_ttl=None, max_lease_ttl=None, description=None, audit_non_hmac_request_keys=None, audit_non_hmac_response_keys=None, listing_visibility="", passthrough_request_headers=None): if not mount_point: mount_point = backend_type return self.sys.tune_auth_method( path=mount_point, default_lease_ttl=default_lease_ttl, max_lease_ttl=max_lease_ttl, description=description, audit_non_hmac_request_keys=audit_non_hmac_request_keys, audit_non_hmac_response_keys=audit_non_hmac_response_keys, listing_visibility=listing_visibility, passthrough_request_headers=passthrough_request_headers, )
[docs] @utils.deprecated_method( to_be_removed_in_version='0.9.0', new_method=api.SystemBackend.read_auth_method_tuning, ) def get_auth_backend_tuning(self, backend_type, mount_point=None): if not mount_point: mount_point = backend_type return self.sys.read_auth_method_tuning( path=mount_point, )
[docs] @utils.deprecated_method( to_be_removed_in_version='0.9.0', new_method=api.SystemBackend.disable_auth_method, ) def disable_auth_backend(self, mount_point): return self.sys.disable_auth_method( path=mount_point, )
[docs] @utils.deprecated_method( to_be_removed_in_version='0.9.0', new_method=api.SystemBackend.list_enabled_audit_devices, ) def list_audit_backends(self): return self.sys.list_enabled_audit_devices()
[docs] @utils.deprecated_method( to_be_removed_in_version='0.9.0', new_method=api.SystemBackend.enable_audit_device, ) def enable_audit_backend(self, backend_type, description=None, options=None, name=None): self.sys.enable_audit_device( device_type=backend_type, description=description, options=options, path=name, )
[docs] @utils.deprecated_method( to_be_removed_in_version='0.9.0', new_method=api.SystemBackend.disable_audit_device, ) def disable_audit_backend(self, name): self.sys.disable_audit_device( path=name, )
[docs] @utils.deprecated_method( to_be_removed_in_version='0.9.0', new_method=api.SystemBackend.calculate_hash, ) def audit_hash(self, name, input): return self.sys.calculate_hash( path=name, input_to_hash=input, )
[docs] @utils.deprecated_method( to_be_removed_in_version='0.8.0', new_method=api.auth_methods.Ldap.login, ) def auth_ldap(self, *args, **kwargs): return self.auth.ldap.login(*args, **kwargs)
[docs] @utils.deprecated_method( to_be_removed_in_version='0.9.0', new_method=api.auth_methods.Gcp.login, ) def auth_gcp(self, *args, **kwargs): return self.auth.gcp.login(*args, **kwargs)
[docs] @utils.deprecated_method( to_be_removed_in_version='0.8.0', new_method=api.auth_methods.Github.login, ) def auth_github(self, *args, **kwargs): return self.auth.github.login(*args, **kwargs)
[docs] @utils.deprecated_method( to_be_removed_in_version='0.8.0', new_method=adapters.Request.close, ) def close(self): return self._adapter.close()
@utils.deprecated_method( to_be_removed_in_version='0.8.0', new_method=adapters.Request.get, ) def _get(self, *args, **kwargs): return self._adapter.get(*args, **kwargs) @utils.deprecated_method( to_be_removed_in_version='0.8.0', new_method=adapters.Request.post, ) def _post(self, *args, **kwargs): return self._adapter.post(*args, **kwargs) @utils.deprecated_method( to_be_removed_in_version='0.8.0', new_method=adapters.Request.put, ) def _put(self, *args, **kwargs): return self._adapter.put(*args, **kwargs) @utils.deprecated_method( to_be_removed_in_version='0.8.0', new_method=adapters.Request.delete, ) def _delete(self, *args, **kwargs): return self._adapter.delete(*args, **kwargs)
[docs] @staticmethod @utils.deprecated_method( to_be_removed_in_version='0.8.0', new_method=adapters.Request.urljoin, ) def urljoin(*args): return adapters.Request.urljoin(*args)
@utils.deprecated_method( to_be_removed_in_version='0.8.0', new_method=adapters.Request.request, ) def __request(self, *args, **kwargs): return self._adapter.request(*args, **kwargs) @utils.deprecated_method( to_be_removed_in_version='0.8.0', new_method=utils.raise_for_error, ) def __raise_error(self, *args, **kwargs): utils.raise_for_error(*args, **kwargs)