#!/usr/bin/python
# -*- coding: utf-8 -*-
""" AWS auth method module """
import logging
import json
from base64 import b64encode
from hvac import exceptions, aws_utils, utils
from hvac.api.vault_api_base import VaultApiBase
from hvac.constants.aws import ALLOWED_IAM_ALIAS_TYPES, ALLOWED_EC2_ALIAS_TYPES
from hvac.constants.aws import DEFAULT_MOUNT_POINT as AWS_DEFAULT_MOUNT_POINT
logger = logging.getLogger(__name__)
[docs]class Aws(VaultApiBase):
"""AWS Auth Method (API).
Reference: https://www.vaultproject.io/api/auth/aws/index.html
"""
[docs] def read_config(self, mount_point=AWS_DEFAULT_MOUNT_POINT):
"""Read previously configured AWS access credentials.
Supported methods:
GET: /auth/{mount_point}/config. Produces: 200 application/json
:param mount_point: The path the AWS auth method was mounted on.
:type mount_point: str | unicode
:return: The data key from the JSON response of the request.
:rtype: dict
"""
api_path = utils.format_url('/v1/auth/{mount_point}/config/client', mount_point=mount_point)
response = self._adapter.get(
url=api_path,
)
return response.get('data')
[docs] def delete_config(self, mount_point=AWS_DEFAULT_MOUNT_POINT):
"""Delete previously configured AWS access credentials,
Supported methods:
DELETE: /auth/{mount_point}/config Produces: 204 (empty body)
:param mount_point: The path the AWS auth method was mounted on.
:type mount_point: str | unicode
:return: The response of the request.
:rtype: requests.Response
"""
api_path = utils.format_url('/v1/auth/{mount_point}/config/client', mount_point=mount_point)
return self._adapter.delete(
url=api_path
)
[docs] def read_identity_integration(self, mount_point=AWS_DEFAULT_MOUNT_POINT):
"""Return previously configured identity integration configuration.
Supported methods:
GET: /auth/{mount_point}/config/identity. Produces: 200 application/json
:param mount_point: The path the AWS auth method was mounted on.
:type mount_point: str | unicode
:return: The data key from the JSON response of the request.
:rtype: dict
"""
api_path = utils.format_url('/v1/auth/{mount_point}/config/identity', mount_point=mount_point)
response = self._adapter.get(
url=api_path,
)
return response.get('data')
[docs] def create_certificate_configuration(self, cert_name, aws_public_cert, document_type=None, mount_point=AWS_DEFAULT_MOUNT_POINT):
"""Register AWS public key to be used to verify the instance identity documents.
While the PKCS#7 signature of the identity documents have DSA digest, the identity signature will have RSA
digest, and hence the public keys for each type varies respectively. Indicate the type of the public key using
the "type" parameter
Supported methods:
POST: /auth/{mount_point}/config/certificate/:cert_name Produces: 204 (empty body)
:param cert_name: Name of the certificate
:type cert_name: string | unicode
:param aws_public_cert: Base64 encoded AWS Public key required to verify PKCS7 signature of the EC2 instance
metadata
:param document_type: Takes the value of either "pkcs7" or "identity", indicating the type of document which can be
verified using the given certificate
:type document_type: string | unicode
:param mount_point: The path the AWS auth method was mounted on.
:type mount_point: str | unicode
:return: The response of the request
:rtype: request.Response
"""
params = {
'cert_name': cert_name,
'aws_public_cert': aws_public_cert,
}
params.update(
utils.remove_nones({
'document_type': document_type,
})
)
api_path = utils.format_url('/v1/auth/{0}/config/certificate/{1}', mount_point, cert_name)
return self._adapter.post(
url=api_path,
json=params,
)
[docs] def read_certificate_configuration(self, cert_name, mount_point=AWS_DEFAULT_MOUNT_POINT):
"""Return previously configured AWS public key.
Supported methods:
GET: /v1/auth/{mount_point}/config/certificate/:cert_name Produces: 200 application/json
:param cert_name: Name of the certificate
:type cert_name: str | unicode
:param mount_point: The path the AWS auth method was mounted on.
:return: The data key from the JSON response of the request.
:rtype: dict
"""
api_path = utils.format_url('/v1/auth/{0}/config/certificate/{1}', mount_point, cert_name)
response = self._adapter.get(
url=api_path,
)
return response.get('data')
[docs] def delete_certificate_configuration(self, cert_name, mount_point=AWS_DEFAULT_MOUNT_POINT):
"""Remove previously configured AWS public key.
Supported methods:
DELETE: /auth/{mount_point}/config/certificate/:cert_name Produces: 204 (empty body)
:param cert_name: Name of the certificate
:type cert_name: str | unicode
:param mount_point: The path the AWS auth method was mounted on.
:type mount_point: str | unicode
:return: The response of the request
:rtype: request.Response
"""
api_path = utils.format_url('/v1/auth/{0}/config/certificate/{1}', mount_point, cert_name)
return self._adapter.delete(
url=api_path,
)
[docs] def list_certificate_configurations(self, mount_point=AWS_DEFAULT_MOUNT_POINT):
"""List AWS public certificates that are registered with the method.
Supported methods
LIST: /auth/{mount_point}/config/certificates Produces: 200 application/json
:param mount_point: The path the AWS auth method was mounted on.
:type mount_point: str
:return: The response of the request.
:rtype: requests.Response
"""
api_path = utils.format_url('/v1/auth/{mount_point}/config/certificates', mount_point=mount_point)
response = self._adapter.list(
url=api_path,
)
return response.get('data')
[docs] def create_sts_role(self, account_id, sts_role, mount_point=AWS_DEFAULT_MOUNT_POINT):
"""Allow the explicit association of STS roles to satellite AWS accounts (i.e. those which are not the
account in which the Vault server is running.)
Vault will use credentials obtained by assuming these STS roles when validating IAM principals or EC2
instances in the particular AWS account
Supported methods:
POST: /v1/auth/{mount_point}/config/sts/:account_id Produces: 204 (empty body)
:param account_id: AWS account ID to be associated with STS role.
If set, Vault will use assumed credentials to verify any login attempts from EC2 instances in this account.
:type account_id: str
:param sts_role: AWS ARN for STS role to be assumed when interacting with the account specified.
The Vault server must have permissions to assume this role.
:type sts_role: str
:param mount_point: The path the AWS auth method was mounted on.
:type mount_point: str
:return: The response of the request.
:rtype: requests.Response
"""
api_path = utils.format_url('/v1/auth/{0}/config/sts/{1}', mount_point, account_id)
params = {
'account_id': account_id,
'sts_role': sts_role,
}
return self._adapter.post(
url=api_path,
json=params,
)
[docs] def read_sts_role(self, account_id, mount_point=AWS_DEFAULT_MOUNT_POINT):
"""Return previously configured STS role.
:param account_id: AWS account ID that has been previously associated with STS role.
:type account_id: str
:param mount_point: The path the AWS auth method was mounted on.
:type mount_point: str
:return: The response of the request.
:rtype: requests.Response
"""
api_path = utils.format_url('/v1/auth/{0}/config/sts/{1}', mount_point, account_id)
response = self._adapter.get(
url=api_path,
)
return response.get('data')
[docs] def list_sts_roles(self, mount_point=AWS_DEFAULT_MOUNT_POINT):
"""List AWS Account IDs for which an STS role is registered.
:param mount_point: The path the AWS auth method was mounted on.
:type mount_point: str
:return: The response of the request.
:rtype: requests.Response
"""
api_path = utils.format_url('/v1/auth/{mount_point}/config/sts', mount_point=mount_point)
response = self._adapter.list(
url=api_path
)
return response.get('data')
[docs] def delete_sts_role(self, account_id, mount_point=AWS_DEFAULT_MOUNT_POINT):
"""Delete a previously configured AWS account/STS role association.
:param account_id:
:param mount_point: The path the AWS auth method was mounted on.
:type mount_point: str
:return: The response of the request.
:rtype: requests.Response
"""
api_path = utils.format_url('/v1/auth/{0}/config/sts/{1}', mount_point, account_id)
return self._adapter.delete(
url=api_path,
)
[docs] def read_identity_whitelist_tidy(self, mount_point=AWS_DEFAULT_MOUNT_POINT):
"""Read previously configured periodic whitelist tidying settings.
:param mount_point: The path the AWS auth method was mounted on.
:type mount_point: str
:return: The response of the request.
:rtype: requests.Response
"""
api_path = utils.format_url('/v1/auth/{mount_point}/config/tidy/identity-whitelist', mount_point=mount_point)
response = self._adapter.get(
url=api_path
)
return response.get('data')
[docs] def delete_identity_whitelist_tidy(self, mount_point=AWS_DEFAULT_MOUNT_POINT):
"""Delete previously configured periodic whitelist tidying settings.
:param mount_point: The path the AWS auth method was mounted on.
:type mount_point: str
:return: The response of the request.
:rtype: requests.Response
"""
api_path = utils.format_url('/v1/auth/{mount_point}/config/tidy/identity-whitelist', mount_point=mount_point)
return self._adapter.delete(
url=api_path,
)
[docs] def read_role_tag_blacklist_tidy(self, mount_point=AWS_DEFAULT_MOUNT_POINT):
"""Read previously configured periodic blacklist tidying settings.
:param mount_point: The path the AWS auth method was mounted on.
:type mount_point: str
:return: The response of the request.
:rtype: requests.Response
"""
api_path = utils.format_url('/v1/auth/{mount_point}/config/tidy/roletag-blacklist', mount_point=mount_point)
response = self._adapter.get(
url=api_path
)
return response.get('data')
[docs] def delete_role_tag_blacklist_tidy(self, mount_point=AWS_DEFAULT_MOUNT_POINT):
"""Delete previously configured periodic blacklist tidying settings.
:param mount_point: The path the AWS auth method was mounted on.
:type mount_point: str
:return: The response of the request.
:rtype: requests.Response
"""
api_path = utils.format_url('/v1/auth/{mount_point}/config/tidy/roletag-blacklist', mount_point=mount_point)
return self._adapter.delete(
url=api_path
)
[docs] def create_role(self, role, auth_type=None, bound_ami_id=None, bound_account_id=None,
bound_region=None, bound_vpc_id=None, bound_subnet_id=None, bound_iam_role_arn=None,
bound_iam_instance_profile_arn=None, bound_ec2_instance_id=None, role_tag=None,
bound_iam_principal_arn=None, inferred_entity_type=None, inferred_aws_region=None,
resolve_aws_unique_ids=None, ttl=None, max_ttl=None, period=None, policies=None,
allow_instance_migration=None, disallow_reauthentication=None,
mount_point=AWS_DEFAULT_MOUNT_POINT):
"""Register a role in the method.
:param role:
:param auth_type:
:param bound_ami_id:
:param bound_account_id:
:param bound_region:
:param bound_vpc_id:
:param bound_subnet_id:
:param bound_iam_role_arn:
:param bound_iam_instance_profile_arn:
:param bound_ec2_instance_id:
:param role_tag:
:param bound_iam_principal_arn:
:param inferred_entity_type:
:param inferred_aws_region:
:param resolve_aws_unique_ids:
:param ttl:
:param max_ttl:
:param period:
:param policies:
:param allow_instance_migration:
:param disallow_reauthentication:
:param mount_point: The path the AWS auth method was mounted on.
:type mount_point: str
:return: The response of the request.
:rtype: requests.Response
"""
api_path = utils.format_url('/v1/auth/{0}/role/{1}', mount_point, role)
params = {
'role': role,
}
params.update(
utils.remove_nones({
'auth_type': auth_type,
'resolve_aws_unique_ids': resolve_aws_unique_ids,
'bound_ami_id': bound_ami_id,
'bound_account_id': bound_account_id,
'bound_region': bound_region,
'bound_vpc_id': bound_vpc_id,
'bound_subnet_id': bound_subnet_id,
'bound_iam_role_arn': bound_iam_role_arn,
'bound_iam_instance_profile_arn': bound_iam_instance_profile_arn,
'bound_ec2_instance_id': bound_ec2_instance_id,
'role_tag': role_tag,
'bound_iam_principal_arn': bound_iam_principal_arn,
'inferred_entity_type': inferred_entity_type,
'inferred_aws_region': inferred_aws_region,
'ttl': ttl,
'max_ttl': max_ttl,
'period': period,
'policies': policies,
'allow_instance_migration': allow_instance_migration,
'disallow_reauthentication': disallow_reauthentication,
})
)
return self._adapter.post(
url=api_path,
json=params,
)
[docs] def read_role(self, role, mount_point=AWS_DEFAULT_MOUNT_POINT):
"""Returns the previously registered role configuration
:param role:
:param mount_point: The path the AWS auth method was mounted on.
:type mount_point: str
:return: The response of the request.
:rtype: requests.Response
"""
api_path = utils.format_url('/v1/auth/{0}/role/{1}', mount_point, role)
response = self._adapter.get(
url=api_path
)
return response.get('data')
[docs] def list_roles(self, mount_point=AWS_DEFAULT_MOUNT_POINT):
"""Lists all the roles that are registered with the method
:param mount_point: The path the AWS auth method was mounted on.
:type mount_point: str
:return: The response of the request.
:rtype: requests.Response
"""
api_path = utils.format_url('/v1/auth/{mount_point}/roles', mount_point=mount_point)
response = self._adapter.list(
url=api_path,
)
return response.get('data')
[docs] def delete_role(self, role, mount_point=AWS_DEFAULT_MOUNT_POINT):
"""Deletes the previously registered role
:param role:
:param mount_point: The path the AWS auth method was mounted on.
:type mount_point: str
:return: The response of the request.
:rtype: requests.Response
"""
api_path = utils.format_url('/v1/auth/{0}/role/{1}', mount_point, role)
return self._adapter.delete(
url=api_path,
)
[docs] def iam_login(self, access_key, secret_key, session_token=None, header_value=None, role=None, use_token=True,
region='us-east-1', mount_point=AWS_DEFAULT_MOUNT_POINT):
"""Fetch a token
This endpoint verifies the pkcs7 signature of the instance identity document or the signature of the
signed GetCallerIdentity request. With the ec2 auth method, or when inferring an EC2 instance,
verifies that the instance is actually in a running state. Cross checks the constraints defined on the
role with which the login is being performed. With the ec2 auth method, as an alternative to pkcs7
signature, the identity document along with its RSA digest can be supplied to this endpoint
:param role: Name of the role against which the login is being attempted.
:type role: str
:param use_token: if True, uses the token in the response received from the auth request to set the "token"
attribute on the the :py:meth:`hvac.adapters.Adapter` instance under the _adapater Client attribute.
:type use_token: bool
:param mount_point: The path the AWS auth method was mounted on.
:type mount_point: str
:return: The response of the request.
:rtype: requests.Response
"""
api_path = utils.format_url('/v1/auth/{mount_point}/login', mount_point=mount_point)
request = aws_utils.generate_sigv4_auth_request(header_value=header_value)
auth = aws_utils.SigV4Auth(access_key, secret_key, session_token, region)
auth.add_auth(request)
# https://github.com/hashicorp/vault/blob/master/builtin/credential/aws/cli.go
headers = json.dumps({k: [request.headers[k]] for k in request.headers})
params = {
'iam_http_request_method': request.method,
'iam_request_url': b64encode(request.url.encode('utf-8')).decode('utf-8'),
'iam_request_headers': b64encode(headers.encode('utf-8')).decode('utf-8'),
'iam_request_body': b64encode(request.body.encode('utf-8')).decode('utf-8'),
'role': role,
}
return self._adapter.login(
url=api_path,
use_token=use_token,
json=params,
)
[docs] def ec2_login(self, pkcs7, nonce=None, role=None, use_token=True, mount_point=AWS_DEFAULT_MOUNT_POINT):
"""Retrieve a Vault token using an AWS authentication method mount's EC2 role.
:param pkcs7: PKCS7 signature of the identity document with all newline characters removed.
:type pkcs7: str
:param nonce: The nonce to be used for subsequent login requests.
:type nonce: str
:param role: Name of the role against which the login is being attempted.
:type role: str
:param use_token: if True, uses the token in the response received from the auth request to set the "token"
attribute on the the :py:meth:`hvac.adapters.Adapter` instance under the _adapater Client attribute.
:type use_token: bool
:param mount_point: The path the AWS auth method was mounted on.
:type mount_point: str
:return: The response of the request.
:rtype: requests.Response
"""
api_path = utils.format_url('/v1/auth/{mount_point}/login', mount_point=mount_point)
params = {
'pkcs7': pkcs7
}
if nonce:
params['nonce'] = nonce
if role:
params['role'] = role
return self._adapter.login(
url=api_path,
use_token=use_token,
json=params,
)
[docs] def read_role_tag_blacklist(self, role_tag, mount_point=AWS_DEFAULT_MOUNT_POINT):
"""Returns the blacklist entry of a previously blacklisted role tag
:param role_tag:
:param mount_point: The path the AWS auth method was mounted on.
:type mount_point: str
:return: The response of the request.
:rtype: requests.Response
"""
api_path = utils.format_url('/v1/auth/{0}/roletag-blacklist/{1}', mount_point, role_tag)
response = self._adapter.get(
url=api_path
)
return response.get('data')
[docs] def read_identity_whitelist(self, instance_id, mount_point=AWS_DEFAULT_MOUNT_POINT):
"""Returns an entry in the whitelist. An entry will be created/updated by every successful login
:param instance_id:
:param mount_point: The path the AWS auth method was mounted on.
:type mount_point: str
:return: The response of the request.
:rtype: requests.Response
"""
api_path = utils.format_url('/v1/auth/{0}/identity-whitelist/{1}', mount_point, instance_id)
response = self._adapter.get(
url=api_path
)
return response.get('data')
[docs] def list_identity_whitelist(self, mount_point=AWS_DEFAULT_MOUNT_POINT):
"""Lists all the instance IDs that are in the whitelist of successful logins
:param mount_point: The path the AWS auth method was mounted on.
:type mount_point: str
:return: The response of the request.
:rtype: requests.Response
"""
api_path = utils.format_url('/v1/auth/{mount_point}/identity-whitelist', mount_point=mount_point)
response = self._adapter.list(
url=api_path,
)
return response.get('data')
[docs] def delete_identity_whitelist_entries(self, instance_id, mount_point=AWS_DEFAULT_MOUNT_POINT):
"""Deletes a cache of the successful login from an instance
:param instance_id:
:param mount_point: The path the AWS auth method was mounted on.
:type mount_point: str
:return: The response of the request.
:rtype: requests.Response
"""
api_path = utils.format_url('/v1/auth/{0}/identity-whitelist/{1}', mount_point, instance_id)
return self._adapter.delete(
url=api_path,
)
[docs] def tidy_identity_whitelist_entries(self, saftey_buffer='72h', mount_point=AWS_DEFAULT_MOUNT_POINT):
"""Cleans up the entries in the whitelist based on expiration time and safety_buffer
:param saftey_buffer:
:param mount_point: The path the AWS auth method was mounted on.
:type mount_point: str
:return: The response of the request.
:rtype: requests.Response
"""
api_path = utils.format_url('/v1/auth/{mount_point}/tidy/identity-whitelist', mount_point=mount_point)
params = {
'safety_buffer': saftey_buffer,
}
return self._adapter.post(
url=api_path,
json=params
)