Source code for hvac.api.secrets_engines.azure

#!/usr/bin/env python
"""Azure secret engine methods module."""
import json

from hvac import exceptions, utils
from hvac.api.vault_api_base import VaultApiBase
from hvac.constants.azure import VALID_ENVIRONMENTS

DEFAULT_MOUNT_POINT = "azure"


[docs]class Azure(VaultApiBase): """Azure Secrets Engine (API). Reference: https://www.vaultproject.io/api/secret/azure/index.html """
[docs] def configure( self, subscription_id, tenant_id, client_id=None, client_secret=None, environment=None, mount_point=DEFAULT_MOUNT_POINT, ): """Configure the credentials required for the plugin to perform API calls to Azure. These credentials will be used to query roles and create/delete service principals. Environment variables will override any parameters set in the config. Supported methods: POST: /{mount_point}/config. Produces: 204 (empty body) :param subscription_id: The subscription id for the Azure Active Directory :type subscription_id: str | unicode :param tenant_id: The tenant id for the Azure Active Directory. :type tenant_id: str | unicode :param client_id: The OAuth2 client id to connect to Azure. :type client_id: str | unicode :param client_secret: The OAuth2 client secret to connect to Azure. :type client_secret: str | unicode :param environment: The Azure environment. If not specified, Vault will use Azure Public Cloud. :type environment: str | unicode :param mount_point: The OAuth2 client secret to connect to Azure. :type mount_point: str | unicode :return: The response of the request. :rtype: requests.Response """ if environment is not None and environment not in VALID_ENVIRONMENTS: error_msg = 'invalid environment argument provided "{arg}", supported environments: "{environments}"' raise exceptions.ParamValidationError( error_msg.format( arg=environment, environments=",".join(VALID_ENVIRONMENTS), ) ) params = { "subscription_id": subscription_id, "tenant_id": tenant_id, } params.update( utils.remove_nones( { "client_id": client_id, "client_secret": client_secret, "environment": environment, } ) ) api_path = utils.format_url("/v1/{mount_point}/config", mount_point=mount_point) return self._adapter.post( url=api_path, json=params, )
[docs] def read_config(self, mount_point=DEFAULT_MOUNT_POINT): """Read the stored configuration, omitting client_secret. Supported methods: GET: /{mount_point}/config. Produces: 200 application/json :param mount_point: The "path" the method/backend was mounted on. :type mount_point: str | unicode :return: The data key from the JSON response of the request. :rtype: dict """ api_path = utils.format_url("/v1/{mount_point}/config", mount_point=mount_point) response = self._adapter.get( url=api_path, ) return response.get("data")
[docs] def delete_config(self, mount_point=DEFAULT_MOUNT_POINT): """Delete the stored Azure configuration and credentials. Supported methods: DELETE: /auth/{mount_point}/config. Produces: 204 (empty body) :param mount_point: The "path" the method/backend was mounted on. :type mount_point: str | unicode :return: The response of the request. :rtype: requests.Response """ api_path = utils.format_url("/v1/{mount_point}/config", mount_point=mount_point) return self._adapter.delete( url=api_path, )
[docs] def create_or_update_role( self, name, azure_roles, ttl=None, max_ttl=None, mount_point=DEFAULT_MOUNT_POINT ): """Create or update a Vault role. The provided Azure roles must exist for this call to succeed. See the Azure secrets roles docs for more information about roles. Supported methods: POST: /{mount_point}/roles/{name}. Produces: 204 (empty body) :param name: Name of the role. :type name: str | unicode :param azure_roles: List of Azure roles to be assigned to the generated service principal. :type azure_roles: list(dict) :param ttl: Specifies the default TTL for service principals generated using this role. Accepts time suffixed strings ("1h") or an integer number of seconds. Defaults to the system/engine default TTL time. :type ttl: str | unicode :param max_ttl: Specifies the maximum TTL for service principals generated using this role. Accepts time suffixed strings ("1h") or an integer number of seconds. Defaults to the system/engine max TTL time. :type max_ttl: str | unicode :param mount_point: The "path" the method/backend was mounted on. :type mount_point: str | unicode :return: The response of the request. :rtype: requests.Response """ params = { "azure_roles": json.dumps(azure_roles), } params.update( utils.remove_nones( { "ttl": ttl, "max_ttl": max_ttl, } ) ) api_path = utils.format_url( "/v1/{mount_point}/roles/{name}", mount_point=mount_point, name=name, ) return self._adapter.post( url=api_path, json=params, )
[docs] def list_roles(self, mount_point=DEFAULT_MOUNT_POINT): """List all of the roles that are registered with the plugin. Supported methods: LIST: /{mount_point}/roles. Produces: 200 application/json :param mount_point: The "path" the method/backend was mounted on. :type mount_point: str | unicode :return: The data key from the JSON response of the request. :rtype: dict """ api_path = utils.format_url("/v1/{mount_point}/roles", mount_point=mount_point) response = self._adapter.list( url=api_path, ) return response.get("data")
[docs] def generate_credentials(self, name, mount_point=DEFAULT_MOUNT_POINT): """Generate a new service principal based on the named role. Supported methods: GET: /{mount_point}/creds/{name}. Produces: 200 application/json :param name: Specifies the name of the role to create credentials against. :type name: str | unicode :param mount_point: The "path" the method/backend was mounted on. :type mount_point: str | unicode :return: The data key from the JSON response of the request. :rtype: dict """ api_path = utils.format_url( "/v1/{mount_point}/creds/{name}", mount_point=mount_point, name=name, ) response = self._adapter.get( url=api_path, ) return response.get("data")