"""
HTTP Client Library Adapters
"""
from abc import ABCMeta, abstractmethod
import requests
import requests.exceptions
from hvac import utils
from hvac.constants.client import DEFAULT_URL
[docs]class Adapter(metaclass=ABCMeta):
"""Abstract base class used when constructing adapters for use with the Client class."""
[docs] @classmethod
def from_adapter(
cls,
adapter,
):
"""Create a new adapter based on an existing Adapter instance.
This can be used to create a new type of adapter that inherits the properties of an existing one.
:param adapter: The existing Adapter instance.
:type adapter: hvac.Adapters.Adapter
"""
return cls(
base_uri=adapter.base_uri,
token=adapter.token,
cert=adapter._kwargs.get("cert"),
verify=adapter._kwargs.get("verify"),
timeout=adapter._kwargs.get("timeout"),
proxies=adapter._kwargs.get("proxies"),
allow_redirects=adapter.allow_redirects,
session=adapter.session,
namespace=adapter.namespace,
ignore_exceptions=adapter.ignore_exceptions,
strict_http=adapter.strict_http,
request_header=adapter.request_header,
)
[docs] def __init__(
self,
base_uri=DEFAULT_URL,
token=None,
cert=None,
verify=True,
timeout=30,
proxies=None,
allow_redirects=True,
session=None,
namespace=None,
ignore_exceptions=False,
strict_http=False,
request_header=True,
):
"""Create a new request adapter instance.
:param base_uri: Base URL for the Vault instance being addressed.
:type base_uri: str
:param token: Authentication token to include in requests sent to Vault.
:type token: str
:param cert: Certificates for use in requests sent to the Vault instance. This should be a tuple with the
certificate and then key.
:type cert: tuple
:param verify: Either a boolean to indicate whether TLS verification should be performed when sending requests to Vault,
or a string pointing at the CA bundle to use for verification. See http://docs.python-requests.org/en/master/user/advanced/#ssl-cert-verification.
:type verify: Union[bool,str]
:param timeout: The timeout value for requests sent to Vault.
:type timeout: int
:param proxies: Proxies to use when preforming requests.
See: http://docs.python-requests.org/en/master/user/advanced/#proxies
:type proxies: dict
:param allow_redirects: Whether to follow redirects when sending requests to Vault.
:type allow_redirects: bool
:param session: Optional session object to use when performing request.
:type session: request.Session
:param namespace: Optional Vault Namespace.
:type namespace: str
:param ignore_exceptions: If True, _always_ return the response object for a given request. I.e., don't raise an exception
based on response status code, etc.
:type ignore_exceptions: bool
:param strict_http: If True, use only standard HTTP verbs in request with additional params, otherwise process as is
:type strict_http: bool
:param request_header: If true, add the X-Vault-Request header to all requests to protect against SSRF vulnerabilities.
:type request_header: bool
"""
if not session:
session = requests.Session()
session.cert, session.verify, session.proxies = cert, verify, proxies
# fix for issue 991 using session verify if set
else:
if session.verify:
# need to set the variable and not assign it to self so it is properly passed in kwargs
verify = session.verify
if session.cert:
cert = session.cert
if session.proxies:
proxies = session.proxies
self.base_uri = base_uri
self.token = token
self.namespace = namespace
self.session = session
self.allow_redirects = allow_redirects
self.ignore_exceptions = ignore_exceptions
self.strict_http = strict_http
self.request_header = request_header
self._kwargs = {
"cert": cert,
"verify": verify,
"timeout": timeout,
"proxies": proxies,
}
[docs] @staticmethod
def urljoin(*args):
"""Joins given arguments into a url. Trailing and leading slashes are stripped for each argument.
:param args: Multiple parts of a URL to be combined into one string.
:type args: str | unicode
:return: Full URL combining all provided arguments
:rtype: str | unicode
"""
return "/".join(map(lambda x: str(x).strip("/"), args))
[docs] def close(self):
"""Close the underlying Requests session."""
self.session.close()
[docs] def get(self, url, **kwargs):
"""Performs a GET request.
:param url: Partial URL path to send the request to. This will be joined to the end of the instance's base_uri
attribute.
:type url: str | unicode
:param kwargs: Additional keyword arguments to include in the requests call.
:type kwargs: dict
:return: The response of the request.
:rtype: requests.Response
"""
return self.request("get", url, **kwargs)
[docs] def post(self, url, **kwargs):
"""Performs a POST request.
:param url: Partial URL path to send the request to. This will be joined to the end of the instance's base_uri
attribute.
:type url: str | unicode
:param kwargs: Additional keyword arguments to include in the requests call.
:type kwargs: dict
:return: The response of the request.
:rtype: requests.Response
"""
return self.request("post", url, **kwargs)
[docs] def put(self, url, **kwargs):
"""Performs a PUT request.
:param url: Partial URL path to send the request to. This will be joined to the end of the instance's base_uri
attribute.
:type url: str | unicode
:param kwargs: Additional keyword arguments to include in the requests call.
:type kwargs: dict
:return: The response of the request.
:rtype: requests.Response
"""
return self.request("put", url, **kwargs)
[docs] def delete(self, url, **kwargs):
"""Performs a DELETE request.
:param url: Partial URL path to send the request to. This will be joined to the end of the instance's base_uri
attribute.
:type url: str | unicode
:param kwargs: Additional keyword arguments to include in the requests call.
:type kwargs: dict
:return: The response of the request.
:rtype: requests.Response
"""
return self.request("delete", url, **kwargs)
[docs] def list(self, url, **kwargs):
"""Performs a LIST request.
:param url: Partial URL path to send the request to. This will be joined to the end of the instance's base_uri
attribute.
:type url: str | unicode
:param kwargs: Additional keyword arguments to include in the requests call.
:type kwargs: dict
:return: The response of the request.
:rtype: requests.Response
"""
return self.request("list", url, **kwargs)
[docs] def head(self, url, **kwargs):
"""Performs a HEAD request.
:param url: Partial URL path to send the request to. This will be joined to the end of the instance's base_uri
attribute.
:type url: str | unicode
:param kwargs: Additional keyword arguments to include in the requests call.
:type kwargs: dict
:return: The response of the request.
:rtype: requests.Response
"""
return self.request("head", url, **kwargs)
[docs] def login(self, url, use_token=True, **kwargs):
"""Perform a login request.
Associated request is typically to a path prefixed with "/v1/auth") and optionally stores the client token sent
in the resulting Vault response for use by the :py:meth:`hvac.adapters.Adapter` instance under the _adapter
Client attribute.
:param url: Path to send the authentication request to.
:type url: str | unicode
:param use_token: if True, uses the token in the response received from the auth request to set the "token"
attribute on the the :py:meth:`hvac.adapters.Adapter` instance under the _adapter Client attribute.
:type use_token: bool
:param kwargs: Additional keyword arguments to include in the params sent with the request.
:type kwargs: dict
:return: The response of the auth request.
:rtype: requests.Response
"""
response = self.post(url, **kwargs)
if use_token:
self.token = self.get_login_token(response)
return response
[docs] @abstractmethod
def get_login_token(self, response):
"""Extracts the client token from a login response.
:param response: The response object returned by the login method.
:return: A client token.
:rtype: str
"""
return NotImplementedError
[docs] @abstractmethod
def request(self, method, url, headers=None, raise_exception=True, **kwargs):
"""Main method for routing HTTP requests to the configured Vault base_uri. Intended to be implement by subclasses.
:param method: HTTP method to use with the request. E.g., GET, POST, etc.
:type method: str
:param url: Partial URL path to send the request to. This will be joined to the end of the instance's base_uri
attribute.
:type url: str | unicode
:param headers: Additional headers to include with the request.
:type headers: dict
:param kwargs: Additional keyword arguments to include in the requests call.
:type kwargs: dict
:param raise_exception: If True, raise an exception via utils.raise_for_error(). Set this parameter to False to
bypass this functionality.
:type raise_exception: bool
:return: The response of the request.
:rtype: requests.Response
"""
raise NotImplementedError
[docs]class RawAdapter(Adapter):
"""
The RawAdapter adapter class.
This adapter adds Vault-specific headers as required and optionally raises exceptions on errors,
but always returns Response objects for requests.
"""
def _raise_for_error(self, method: str, url: str, response: requests.Response):
msg = json = text = errors = None
try:
text = response.text
except Exception:
pass
if response.headers.get("Content-Type") == "application/json":
try:
json = response.json()
except Exception:
pass
else:
errors = json.get("errors")
if errors is None:
msg = text
utils.raise_for_error(
method,
url,
response.status_code,
msg,
errors=errors,
text=text,
json=json,
)
[docs] def get_login_token(self, response):
"""Extracts the client token from a login response.
:param response: The response object returned by the login method.
:type response: requests.Response
:return: A client token.
:rtype: str
"""
response_json = response.json()
return response_json["auth"]["client_token"]
[docs] def request(self, method, url, headers=None, raise_exception=True, **kwargs):
"""Main method for routing HTTP requests to the configured Vault base_uri.
:param method: HTTP method to use with the request. E.g., GET, POST, etc.
:type method: str
:param url: Partial URL path to send the request to. This will be joined to the end of the instance's base_uri
attribute.
:type url: str | unicode
:param headers: Additional headers to include with the request.
:type headers: dict
:param raise_exception: If True, raise an exception via utils.raise_for_error(). Set this parameter to False to
bypass this functionality.
:type raise_exception: bool
:param kwargs: Additional keyword arguments to include in the requests call.
:type kwargs: dict
:return: The response of the request.
:rtype: requests.Response
"""
while "//" in url:
# Vault CLI treats a double forward slash ('//') as a single forward slash for a given path.
# To avoid issues with the requests module's redirection logic, we perform the same translation here.
url = url.replace("//", "/")
url = self.urljoin(self.base_uri, url)
if not headers:
headers = {}
if self.request_header:
headers["X-Vault-Request"] = "true"
if self.token:
headers["X-Vault-Token"] = self.token
if self.namespace:
headers["X-Vault-Namespace"] = self.namespace
wrap_ttl = kwargs.pop("wrap_ttl", None)
if wrap_ttl:
headers["X-Vault-Wrap-TTL"] = str(wrap_ttl)
_kwargs = self._kwargs.copy()
_kwargs.update(kwargs)
if self.strict_http and method.lower() in ("list",):
# Entry point for standard HTTP substitution
params = _kwargs.get("params", {})
if method.lower() == "list":
method = "get"
params.update({"list": "true"})
_kwargs["params"] = params
response = self.session.request(
method=method,
url=url,
headers=headers,
allow_redirects=self.allow_redirects,
**_kwargs
)
if not response.ok and (raise_exception and not self.ignore_exceptions):
self._raise_for_error(method, url, response)
return response
[docs]class JSONAdapter(RawAdapter):
"""
The JSONAdapter adapter class.
This adapter works just like the RawAdapter adapter except that HTTP 200 responses are returned as JSON dicts.
All non-200 responses are returned as Response objects.
"""
[docs] def get_login_token(self, response):
"""Extracts the client token from a login response.
:param response: The response object returned by the login method.
:type response: dict | requests.Response
:return: A client token.
:rtype: str
"""
return response["auth"]["client_token"]
[docs] def request(self, *args, **kwargs):
"""Main method for routing HTTP requests to the configured Vault base_uri.
:param args: Positional arguments to pass to RawAdapter.request.
:type args: list
:param kwargs: Keyword arguments to pass to RawAdapter.request.
:type kwargs: dict
:return: Dict on HTTP 200 with JSON body, otherwise the response object.
:rtype: dict | requests.Response
"""
response = super().request(*args, **kwargs)
if response.status_code == 200:
try:
return response.json()
except ValueError:
pass
return response
# Retaining the legacy name
Request = RawAdapter