# -*- coding: utf-8 -*-
"""RestSession class for creating connections to the DNA Center APIs.
Copyright (c) 2019-2021 Cisco Systems.
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
"""
import errno
import logging
import os
import re
import socket
import time
import urllib.parse
import warnings
from builtins import *
import requests
from requests.packages.urllib3.response import HTTPResponse
from requests_toolbelt.multipart import encoder
from .config import (
DEFAULT_SINGLE_REQUEST_TIMEOUT,
DEFAULT_VERIFY,
DEFAULT_WAIT_ON_RATE_LIMIT,
)
from .exceptions import (
ApiError,
DownloadFailure,
RateLimitError,
RateLimitWarning,
dnacentersdkException,
)
from .response_codes import EXPECTED_RESPONSE_CODE
from .utils import (
check_response_code,
check_type,
extract_and_parse_json,
pprint_request_info,
pprint_response_info,
validate_base_url,
)
logger = logging.getLogger(__name__)
[docs]class DownloadResponse(HTTPResponse):
"""Download Response wrapper.
Bases: urllib3.response.HTTPResponse. For more
information check the `urlib3 documentation <https://urllib3.readthedocs.io/en/latest/reference/urllib3.response.html>`_
HTTP Response container.
"""
def __init__(self, response, path, filename, dirpath, collected_data):
"""
Creates a new DownloadResponse object.
By calling urllib3.response.HTTPResponse __init__ method
with the raw property of requests.Response,
it recovers data from the API response
It adds properties regarding the download: filename, dirpath and path
Args:
response(requests.Response): The Response object, which contains a server's
response to an HTTP request.
path(str): The downloaded file path.
filename(str): The downloaded filename.
dirpath(str): The download directory path.
collected_data(bytes): HTTP response's data.
"""
super(DownloadResponse, self).__init__(
body=response.raw,
headers=response.headers,
status=response.status_code,
reason=response.reason,
request_method=response.request.method,
request_url=response.request.url,
)
# adds additional and important information
self._filename = filename
self._dirpath = dirpath
self._path = path
self._collected_data = collected_data
@property
def data(self):
"""The HTTPResponse's data"""
# Call HTTPResponse's data property
original_data = super(DownloadResponse, self).data
# It uses the one that has value prioritizing the HTTPResponse's data
return original_data or self._collected_data
@property
def filename(self):
"""The downloaded filename"""
return self._filename
@property
def dirpath(self):
"""The downloaded directory path"""
return self._dirpath
@property
def path(self):
"""The download file path"""
return self._path
# Main module interface
class RestSession(object):
"""RESTful HTTP session class for making calls to the DNA Center APIs."""
def __init__(
self,
get_access_token,
base_url,
single_request_timeout=DEFAULT_SINGLE_REQUEST_TIMEOUT,
wait_on_rate_limit=DEFAULT_WAIT_ON_RATE_LIMIT,
session=None,
verify=DEFAULT_VERIFY,
version=None,
debug=False,
user_agent=None,
):
"""Initialize a new RestSession object.
Args:
get_access_token(callable): The DNA Center method to get a new
access token.
base_url(str): The base URL that will be suffixed onto API
endpoint relative URLs to produce a callable absolute URL.
single_request_timeout(int): The timeout (seconds) for a single
HTTP REST API request.
wait_on_rate_limit(bool): Enable or disable automatic rate-limit
handling.
session(requests.Session): Optionally inject a `requests.Session`
object to be used for HTTP operations.
verify(bool,str): Controls whether we verify the server's
TLS certificate, or a string, in which case it must be a path
to a CA bundle to use.
version(str): Controls which version of DNA_CENTER to use.
Defaults to dnacentersdk.config.DNA_CENTER_VERSION
debug(bool,str): Controls whether to log information about
DNA Center APIs' request and response process.
Defaults to the DEBUG environment variable or False
if the environment variable is not set.
Raises:
TypeError: If the parameter types are incorrect.
"""
check_type(base_url, str, may_be_none=False)
check_type(single_request_timeout, int)
check_type(wait_on_rate_limit, bool, may_be_none=False)
check_type(verify, (bool, str), may_be_none=False)
check_type(version, str, may_be_none=False)
check_type(debug, (bool), may_be_none=False)
check_type(user_agent, str, may_be_none=False)
super(RestSession, self).__init__()
# Initialize attributes and properties
self._base_url = str(validate_base_url(base_url))
self._get_access_token = get_access_token
self._access_token = None
self._single_request_timeout = single_request_timeout
self._wait_on_rate_limit = wait_on_rate_limit
self._verify = verify
self._version = version
self._debug = debug
self._user_agent = user_agent
self._authenticated = False # Flag to track if we've authenticated
if debug:
logger.setLevel(logging.DEBUG)
logger.propagate = True
else:
logger.setLevel(logging.INFO)
if verify is False:
requests.packages.urllib3.disable_warnings()
# Use the injected `requests` session, build a new one if not provided
self._req_session = session or requests.session()
user_agent_suffix = ""
if user_agent is not None and user_agent != "":
user_agent_suffix = "-" + user_agent
# Initialize headers without X-Auth-Token for lazy authentication
headers = {
"Content-type": "application/json;charset=utf-8",
"User-Agent": f"python-cisco-dnacsdk/{version}{user_agent_suffix}",
}
self.update_headers(headers)
@property
def version(self):
"""The API version of DNA Center."""
return self._version
@property
def user_agent(self):
"""The API user agent."""
return self._user_agent
@property
def verify(self):
"""The verify (TLS Certificate) for the API endpoints."""
return self._verify
@verify.setter
def verify(self, value):
"""The verify (TLS Certificate) for the API endpoints."""
check_type(value, (bool, str), may_be_none=False)
self._verify = value
@property
def base_url(self):
"""The base URL for the API endpoints."""
return self._base_url
@base_url.setter
def base_url(self, value):
"""The base URL for the API endpoints."""
check_type(value, str, may_be_none=False)
self._base_url = str(validate_base_url(value))
@property
def access_token(self):
"""The DNA Center access token used for this session."""
if not self._authenticated and self._get_access_token:
self._ensure_authenticated()
return self._access_token
@property
def single_request_timeout(self):
"""The timeout (seconds) for a single HTTP REST API request."""
return self._single_request_timeout
@single_request_timeout.setter
def single_request_timeout(self, value):
"""The timeout (seconds) for a single HTTP REST API request."""
check_type(value, int)
assert value is None or value > 0
self._single_request_timeout = value
@property
def wait_on_rate_limit(self):
"""Automatic rate-limit handling.
This setting enables or disables automatic rate-limit handling. When
enabled, rate-limited requests will be automatically be retried after
waiting `Retry-After` seconds (provided by DNA Center in the
rate-limit response header).
"""
return self._wait_on_rate_limit
@wait_on_rate_limit.setter
def wait_on_rate_limit(self, value):
"""Enable or disable automatic rate-limit handling."""
check_type(value, bool, may_be_none=False)
self._wait_on_rate_limit = value
@property
def headers(self):
"""The HTTP headers used for requests in this session."""
return self._req_session.headers.copy()
@property
def debug(self):
"""The DNA Center access token used for this session."""
return self._debug
@property
def authenticated(self):
"""Flag to track if we've authenticated."""
return self._authenticated
def update_headers(self, headers):
"""Update the HTTP headers used for requests in this session.
Note: Updates provided by the dictionary passed as the `headers`
parameter to this method are merged into the session headers by adding
new key-value pairs and/or updating the values of existing keys. The
session headers are not replaced by the provided dictionary.
Args:
headers(dict): Updates to the current session headers.
"""
check_type(headers, dict, may_be_none=False)
self._req_session.headers.update(headers)
def _ensure_authenticated(self):
"""Ensure that we have a valid access token.
If we don't have an access token or haven't authenticated yet,
call the get_access_token method to authenticate.
"""
if not self._authenticated or not self._access_token:
self._access_token = self._get_access_token()
self.update_headers({"X-Auth-Token": self._access_token})
self._authenticated = True
def refresh_token(self):
"""Call the get_access_token method and update the session's
auth header with the new token.
"""
self._access_token = self._get_access_token()
self.update_headers({"X-Auth-Token": self.access_token})
self._authenticated = True
def abs_url(self, url):
"""Given a relative or absolute URL; return an absolute URL.
Args:
url(str): A relative or absolute URL.
Returns:
str: An absolute URL.
"""
parsed_url = urllib.parse.urlparse(url)
if not parsed_url.scheme and not parsed_url.netloc:
# url is a relative URL; combine with base_url
return urllib.parse.urljoin(str(self.base_url), str(url))
else:
# url is already an absolute URL; return as is
return url
def get_filename(self, content):
"""Get the filename from the Content-Disposition's header
Args:
content(str): the Content-Disposition's header
Returns:
str: the filename from the Content-Disposition's header
Raises:
Exception: If was not able to find the header's filename value.
"""
content_file_list = re.findall("filename=(.*)", content)
if len(content_file_list) > 0:
content_file_name = content_file_list[0].replace('"', "")
else:
raise Exception("Could not find the header's filename value")
return content_file_name
def download(self, method, url, erc, custom_refresh, **kwargs):
"""It immediately downloads the response content.
Args:
method(str): The request-method type ('GET', 'POST', etc.).
url(str): The URL of the API endpoint to be called.
erc(int): The expected response code that should be returned by the
DNA Center API endpoint to indicate success.
**kwargs: Passed on to the requests package.
To download it to a file use the `save_file` kwarg equal to True.
It defaults to False. If False is only 'downloaded' to a data property.
To specify the downloaded file use the `filename` kwarg.
It defaults to the value of the Content-Disposition header's filename.
To specify the downloaded directory path use the `dirpath` kwarg.
It defaults to the os.getcwd() result.
Returns:
DownloadResponse: The DownloadResponse wrapper. Wraps the urllib3.response.HTTPResponse. For more
information check the `urlib3 documentation <https://urllib3.readthedocs.io/en/latest/reference/urllib3.response.html>`_
Raises:
DownloadFailure: If was not able to download the raw
response to a file.
"""
save_file = kwargs.pop("save_file", False)
dirpath = kwargs.pop("dirpath", None)
filename = kwargs.pop("filename", None)
filepath = None
collected_data = bytes()
if not (dirpath) or not (os.path.isdir(dirpath)):
dirpath = os.getcwd()
with self.request(method, url, erc, 0, **kwargs) as resp:
if resp.headers and resp.headers.get("Content-Disposition"):
try:
content = resp.headers.get("Content-Disposition")
filename = filename or self.get_filename(content)
filepath = os.path.join(dirpath, filename)
except Exception as e:
raise DownloadFailure(resp, e)
if save_file and filepath:
try:
with open(filepath, "wb") as f:
logger.debug("Downloading {0}".format(filepath))
for chunk in resp.iter_content(chunk_size=1024):
if chunk:
collected_data += chunk
f.write(chunk)
except Exception as e:
raise DownloadFailure(resp, e)
logger.debug("Downloaded {0}".format(filepath))
final_response = DownloadResponse(
resp, filepath, filename, dirpath, collected_data
)
return final_response
def request(self, method, url, erc, custom_refresh, json_null=True, **kwargs):
"""Abstract base method for making requests to the DNA Center APIs.
This base method:
* Expands the API endpoint URL to an absolute URL
* Makes the actual HTTP request to the API endpoint
* Provides support for DNA Center rate-limiting
* Inspects response codes and raises exceptions as appropriate
* Updates the token if response code is 401 - Unauthorized
and makes the request to the API endpoint again
Args:
method(str): The request-method type ('GET', 'POST', etc.).
url(str): The URL of the API endpoint to be called.
erc(int): The expected response code that should be returned by the
DNA Center API endpoint to indicate success.
json_null(bool): Validate if the json should be blank.
**kwargs: Passed on to the requests package.
Returns:
requests.Response: The Response object, which contains a server's response to an HTTP request.
Raises:
ApiError: If anything other than the expected response code is
returned by the DNA Center API endpoint.
"""
# Ensure we are authenticated before making any request
self._ensure_authenticated()
# Ensure the url is an absolute URL
abs_url = self.abs_url(url)
# Update request kwargs with session defaults
kwargs.setdefault("timeout", self.single_request_timeout)
kwargs.setdefault("verify", self.verify)
# Fixes requests inconsistent behavior with additional parameters
if json_null:
if not kwargs.get("json"):
kwargs.pop("json", None)
if not kwargs.get("data"):
kwargs.pop("data", None)
c = custom_refresh
while True:
c += 1
# Make the HTTP request to the API endpoint
try:
logger.debug("Attempt {}".format(c))
logger.debug(
pprint_request_info(
abs_url, method, _headers=self.headers, **kwargs
)
)
response = self._req_session.request(method, abs_url, **kwargs)
except socket.error:
# A socket error
try:
c += 1
logger.debug("Attempt {}".format(c))
response = self._req_session.request(method, abs_url, **kwargs)
except Exception as e:
raise dnacentersdkException("Socket error {}".format(e))
except IOError as e:
if e.errno == errno.EPIPE:
# EPIPE error
try:
c += 1
logger.debug("Attempt {}".format(c))
response = self._req_session.request(method, abs_url, **kwargs)
except Exception as e:
raise dnacentersdkException("PipeError {}".format(e))
else:
raise dnacentersdkException("IOError {}".format(e))
try:
# Check the response code for error conditions
check_response_code(response, erc)
except RateLimitError as e:
# Catch rate-limit errors
# Wait and retry if automatic rate-limit handling is enabled
if self.wait_on_rate_limit:
warnings.warn(RateLimitWarning(response))
time.sleep(e.retry_after)
continue
else:
# Re-raise the RateLimitError
raise
except ApiError as e:
if e.status_code == 401 and custom_refresh < 1:
logger.debug(pprint_response_info(response))
logger.debug("Refreshing access token")
self.refresh_token()
logger.debug("Refreshed token.")
return self.request(method, url, erc, 1, **kwargs)
else:
# Re-raise the ApiError
logger.debug(pprint_response_info(response))
raise
else:
logger.debug(pprint_response_info(response))
return response
def multipart_data(self, fields, create_callback):
"""Creates a multipart/form-data body.
Args:
fields(dict,list): form data values.
create_callback(function): function that creates a function that
monitors the progress of the upload.
boundary: MultipartEncoder's boundary.
Default value: None.
encoding(string): MultipartEncoder's encoding.
Default value: utf-8.
"""
if fields is not None:
e = encoder.MultipartEncoder(fields=fields)
if create_callback is not None:
callback = create_callback(e)
m = encoder.MultipartEncoderMonitor(e, callback)
return m
else:
return e
else:
return None
def get(self, url, params=None, **kwargs):
"""Sends a GET request.
Args:
url(str): The URL of the API endpoint.
params(dict): The parameters for the HTTP GET request.
**kwargs:
erc(int): The expected (success) response code for the request.
others: Passed on to the requests package.
Returns:
DownloadResponse: If it has `stream` kwarg with a True value.
Any: Result of the `json.loads` of the server's response to an HTTP request.
Raises:
ApiError: If anything other than the expected response code is
returned by the DNA Center API endpoint.
"""
check_type(url, str, may_be_none=False)
check_type(params, dict)
# Expected response code
erc = kwargs.pop("erc", EXPECTED_RESPONSE_CODE["GET"])
stream = kwargs.get("stream", None)
if stream:
return self.download("GET", url, erc, 0, params=params, **kwargs)
else:
response = self.request("GET", url, erc, 0, params=params, **kwargs)
# Handle No Content (204) responses
if response.status_code == 204:
return None
return extract_and_parse_json(response)
def patch(self, url, params=None, json=None, data=None, **kwargs):
"""Sends a PATCH request.
Args:
url(str): The URL of the API endpoint.
json: Data to be sent in JSON format in tbe body of the request.
data: Data to be sent in the body of the request.
**kwargs:
erc(int): The expected (success) response code for the request.
others: Passed on to the requests package.
Returns:
DownloadResponse: If it has `stream` kwarg with a True value.
Any: Result of the `json.loads` of the server's response to an HTTP request.
Raises:
ApiError: If anything other than the expected response code is
returned by the DNA Center API endpoint.
"""
check_type(url, str, may_be_none=False)
check_type(params, dict)
# Expected response code
erc = kwargs.pop("erc", EXPECTED_RESPONSE_CODE["PATCH"])
stream = kwargs.get("stream", None)
if stream:
return self.download(
"PATCH", url, erc, 0, params=params, json=json, data=data, **kwargs
)
else:
response = self.request(
"PATCH", url, erc, 0, params=params, json=json, data=data, **kwargs
)
return extract_and_parse_json(response)
def post(self, url, params=None, json=None, data=None, **kwargs):
"""Sends a POST request.
Args:
url(str): The URL of the API endpoint.
json: Data to be sent in JSON format in tbe body of the request.
data: Data to be sent in the body of the request.
**kwargs:
erc(int): The expected (success) response code for the request.
others: Passed on to the requests package.
Returns:
DownloadResponse: If it has `stream` kwarg with a True value.
Any: Result of the `json.loads` of the server's response to an HTTP request.
Raises:
ApiError: If anything other than the expected response code is
returned by the DNA Center API endpoint.
"""
check_type(url, str, may_be_none=False)
check_type(params, dict)
# Expected response code
erc = kwargs.pop("erc", EXPECTED_RESPONSE_CODE["POST"])
stream = kwargs.get("stream", None)
if stream:
return self.download(
"POST", url, erc, 0, params=params, json=json, data=data, **kwargs
)
else:
response = self.request(
"POST", url, erc, 0, params=params, json=json, data=data, **kwargs
)
return extract_and_parse_json(response)
def put(self, url, params=None, json=None, data=None, **kwargs):
"""Sends a PUT request.
Args:
url(str): The URL of the API endpoint.
json: Data to be sent in JSON format in tbe body of the request.
data: Data to be sent in the body of the request.
**kwargs:
erc(int): The expected (success) response code for the request.
others: Passed on to the requests package.
Returns:
DownloadResponse: If it has `stream` kwarg with a True value.
Any: Result of the `json.loads` of the server's response to an HTTP request.
Raises:
ApiError: If anything other than the expected response code is
returned by the DNA Center API endpoint.
"""
check_type(url, str, may_be_none=False)
check_type(params, dict)
# Expected response code
erc = kwargs.pop("erc", EXPECTED_RESPONSE_CODE["PUT"])
stream = kwargs.get("stream", None)
if stream:
return self.download(
"PUT", url, erc, 0, params=params, json=json, data=data, **kwargs
)
else:
response = self.request(
"PUT", url, erc, 0, params=params, json=json, data=data, **kwargs
)
return extract_and_parse_json(response)
def delete(self, url, params=None, **kwargs):
"""Sends a DELETE request.
Args:
url(str): The URL of the API endpoint.
**kwargs:
erc(int): The expected (success) response code for the request.
others: Passed on to the requests package.
Raises:
ApiError: If anything other than the expected response code is
returned by the DNA Center API endpoint.
"""
check_type(url, str, may_be_none=False)
check_type(params, dict)
# Expected response code
erc = kwargs.pop("erc", EXPECTED_RESPONSE_CODE["DELETE"])
response = self.request("DELETE", url, erc, 0, params=params, **kwargs)
return extract_and_parse_json(response)
def close(self):
"""Close the underlying requests session.
This method closes the underlying requests session, which will close
any open HTTP connections and free up resources.
Example:
session = RestSession(...)
# ... use session for API calls ...
session.close() # Close all connections
"""
if hasattr(self, "_req_session") and self._req_session:
self._req_session.close()
self._req_session = None
def __del__(self):
"""Destructor that ensures the session is closed when the object is garbage collected."""
self.close()