Module max_ard.session
Authenticated sessions for communicating with the ARD API endpoints
Expand source code
""" Authenticated sessions for communicating with the ARD API endpoints"""
import base64
import json
import os
import warnings
from configparser import ConfigParser
from platform import python_version, system
from oauthlib.oauth2 import BackendApplicationClient, LegacyApplicationClient
from oauthlib.oauth2.rfc6749.errors import MissingTokenError
from requests.adapters import HTTPAdapter
from requests.auth import HTTPBasicAuth
from requests.packages.urllib3.util.retry import Retry
from requests_oauthlib import OAuth2Session
from max_ard.exceptions import (
ard_server_request_hook,
bad_ard_request_hook,
bad_geom_request_hook,
missing_resource_hook,
oversize_request_hook,
unauth_request_hook,
)
try:
from importlib import metadata
except ImportError: # for Python<3.8
import importlib_metadata as metadata
__all__ = ("get_user_session", "get_client_session", "get_self")
SAVE_TOKEN = True # if false, never save the token back to the config file
ARD_TIMEOUT = os.environ.get(
"ARD_TIMEOUT", 30
) # seconds, this is high for big selects and dry run orders
def jwt_expires(token):
"""returns the expiration from a JWT token
Arguments
---------
token: str
JWT token to decode
Returns
-------
str: expiration time
Notes
-----
THIS DOES NOT VERIFY THE TOKEN"""
payload_segment = token.split(".")[1]
input = payload_segment.encode("ascii")
rem = len(input) % 4
if rem > 0:
input += b"=" * (4 - rem)
payload = base64.urlsafe_b64decode(input)
return json.loads(payload)["exp"]
# Timeout and Retry handling
# from https://findwork.dev/blog/advanced-usage-python-requests-timeouts-retries-hooks/
class TimeoutHTTPAdapter(HTTPAdapter):
"""HTTP Adapter that adds in default timeouts
Keywords
--------
timeout(numeric): seconds to wait before timing out
"""
def __init__(self, *args, **kwargs):
self.timeout = ARD_TIMEOUT
if "timeout" in kwargs:
self.timeout = kwargs["timeout"]
del kwargs["timeout"]
super().__init__(*args, **kwargs)
def send(self, request, **kwargs):
timeout = kwargs.get("timeout")
if timeout is None:
kwargs["timeout"] = self.timeout
return super().send(request, **kwargs)
def mount_adaptors(session):
"""Mount adaptors to a session for retries and update UA string
Arguments
---------
session: Requests session
"""
retries = Retry(total=3, backoff_factor=5, status_forcelist=[429, 503, 504])
session.mount("https://", TimeoutHTTPAdapter(max_retries=retries))
session.mount("http://", TimeoutHTTPAdapter(max_retries=retries))
version = metadata.version("max_ard")
session.headers.update(
{"User-Agent": f"max_ard/{version} (Python {python_version()}/{system()})"}
)
return session
def link_paginated_response(session, url, limit=None, **params):
"""Follow responses with paginated links and gather up objects
Newer paginated endpoints give you a link to retrieve the next set of objects.
For older endpoints that only return object IDs, see the next function.
Parameters
----------
session: Requests sessionj
A session through which to make the requests, usually an ARD session
url: str
URL to start fetching objects
limit: int or None, default None
Limit number of objects returned, None means unlimited
**params: any
Additional parameters to pass through to the session's GET method
Returns
-------
list: any
A list of objects returned by the endpoint"""
response = {"has_more": True}
things = []
# bump up the default fetch limit
params["limit"] = 100
while response["has_more"]:
response = session.get(url, params=params).json()
for thing in response["data"]:
things.append(thing)
if limit and len(things) == limit:
return things
# nothing returned
if len(things) == 0:
return []
if response["has_more"]:
url = response["links"]["next_page"]
return things
def paginated_response(session, url, key, limit=None, **params):
"""Follow paginated responses with object IDs and gather up objects
This uses `starting_after` to grab the next batch of objects after the last
object's ID.
Parameters
----------
session: Requests session
A session through which to make the requests, usually an ARD session
url: str
URL to start fetching objects
key: callable
A callable that given the object, returns the object's ID.
limit: int or None, default None
Limit number of objects returned, None means unlimited
**params: any
Additional parameters to pass through to the session's GET method
Returns
-------
list: any
A list of objects returned by the endpoint"""
response = {"has_more": True}
things = []
# bump up the default fetch limit
params["limit"] = 100
while response["has_more"]:
response = session.get(url, params=params).json()
for thing in response["data"]:
things.append(thing)
last_id = key(thing)
if limit and len(things) == limit:
return things
# nothing returned
if len(things) == 0:
return []
params["starting_after"] = last_id
return things
def ard_url(*args):
"""Get an ARD url built from given subfolders
This will build the url using hostname and dev feature branch envvars, if present.
Parameters
----------
*args: any
ARD API endpoint subfolders to concatenate into a url
Returns
-------
str: endpoint url
Examples
--------
No hostname or branch set returns prod urls:
ard_url('select', 'status') -> https://ard.maxar.com/api/v1/select/status
Hostname set:
MAXAR_ARD_HOSTNAME = ard-dev.maxar.com
ard_url('select', 'status') -> https://ard-dev.maxar.com/api/v1/select/status
Feature Branch (only applies to dev):
MAXAR_ARD_HOSTNAME = ard-dev.maxar.com
ARD_DEV_BRANCH = new_kml
ard_url('select', 'status') -> https://ard-dev.maxar.com/api/v1/select-new_kml/status
"""
hostname = os.environ.get("MAXAR_ARD_HOSTNAME", "ard.maxar.com/api/v1")
dev_branch = os.environ.get("ARD_DEV_BRANCH")
if dev_branch:
return f'https://{hostname}/{args[0]}-{dev_branch}/{"/".join(args[1:])}'
else:
return f'https://{hostname}/{args[0]}/{"/".join(args[1:])}'
AUTH_URL = ard_url("auth", "authenticate")
_USER_SESSION_CACHE = None
_CLIENT_SESSION_CACHE = {}
def get_self(session):
"""Fetch the account SELF endpoint of the current user
Parameters
----------
session : OAuth2Session
The session object created for the current user
Returns
-------
dict: SELF api payload"""
r = session.get(ard_url("auth", "self"))
r.raise_for_status()
return r.json()
def get_session():
warnings.warn(
"get_session() will be deprecated for the more specific get_user_session()",
DeprecationWarning,
)
return get_user_session()
def add_session_hooks(session):
"""
Add hooks for session object, and return it.
Note: Order of Hooks is important.
"""
session.hooks["response"] = [
# Specific 400 errors
bad_geom_request_hook,
# Generic 400 errors
bad_ard_request_hook,
# 413 errors
oversize_request_hook,
# 403 errors
unauth_request_hook,
# 404 errors
missing_resource_hook,
# non-200 errors
ard_server_request_hook,
]
return session
def get_user_session():
"""Get an authenticated Requests session for communicating with the ARD API
Example
-------
>>> r = get_user_session()
>>> resp = r.get('https://ard-dev.geobigdata.io/api/v1/select/request/5542092796671610498')
Returns
-------
OAuth2Session
Authenticated Requests session
Notes
-----
For credentials, create a file in your home directory called .ard-config with these contents:
[ard]
user_name = <your_user_name>
user_password = <your_password>
or set these environment variables:
ARD_USERNAME
ARD_PASSWORD
"""
global _USER_SESSION_CACHE
if _USER_SESSION_CACHE is None:
_USER_SESSION_CACHE = _get_user_session()
return _USER_SESSION_CACHE
def _get_user_session(config_file=None):
"""Get an authenticated Requests session for communicating with the ARD API
Parameters
----------
config_file, optional
Path to config file
Returns
-------
OAuth2Session
Authenticated Requests session
Notes
-----
If you provide ARD_ACCESS_TOKEN and ARD_REFRESH_TOKEN via env vars it will
use those credentials. If you provide a path to a config
file, it will look there for the credentials. If you don't it will try to
pull the credentials from environment variables (ARD_USERNAME, ARD_PASSWORD).
If that fails and you have a '~/.ard-config' ini file, it will read from that.
"""
session = None
if os.environ.get("ARD_ACCESS_TOKEN", None):
session = session_from_existing_token(
access_token=os.environ.get("ARD_ACCESS_TOKEN", None),
refresh_token=os.environ.get("ARD_REFRESH_TOKEN", None),
)
# If no config file specified, try using environment variables. If that
# fails and there is a config in the default location, use that.
if not session and not config_file:
try:
session = session_from_envvars()
except MissingTokenError as e:
raise Exception("Invalid ARD credentials given in environment variables.")
except Exception as e:
config_file = os.path.expanduser("~/.ard-config")
example_credentials = """
[ard]
user_name = your_user_name
user_password = your_password
"""
if config_file and not os.path.isfile(config_file):
raise Exception(
f"Please create a ARD credential file at ~/.ard-config with these contents:\n{example_credentials}"
)
if not session:
try:
session = session_from_config(config_file)
except:
raise Exception(
"Invalid credentials or incorrectly formatted config file at ~/.ard-config"
)
session = mount_adaptors(session)
return add_session_hooks(session)
def get_client_session(client_id=None, client_secret=None):
"""Get an authenticated session using client credentials (Maxar internal only)
If a client ID and secret are not passed as kwargs, the env vars
ARD_CLIENT_ID and ARD_CLIENT_SECRET will be used if they exist."""
global _CLIENT_SESSION_CACHE
if _CLIENT_SESSION_CACHE[client_id] is None:
_CLIENT_SESSION_CACHE[client_id] = _get_client_session(
client_id=client_id, client_secret=client_secret
)
return _CLIENT_SESSION_CACHE
def _get_client_session(client_id=None, client_secret=None):
"""Get an authenticated session using client credentials (Maxar internal only)
If a client ID and secret are not passed as kwargs, the env vars
ARD_CLIENT_ID and ARD_CLIENT_SECRET will be used if they exist."""
token_url = f"{AUTH_URL}/oauth2/token"
if not client_id:
client_id = os.environ.get("ARD_CLIENT_ID")
if not client_secret:
client_secret = os.environ.get("ARD_CLIENT_SECRET")
if not client_id or not client_secret:
raise ValueError("Unable to find credentials")
auth = HTTPBasicAuth(client_id, client_secret)
client = BackendApplicationClient(client_id=client_id)
# set up our OAuth session
oauth = OAuth2Session(
client=client, scope=["com.maxar.ard/ard.search", "com.maxar.ard/ard.order"]
)
token = oauth.fetch_token(token_url=token_url, auth=auth)
return mount_adaptors(oauth)
def session_from_existing_token(access_token, refresh_token="no_refresh_token", auth_url=AUTH_URL):
"""Get a session using an existing access_token and refresh_token.
Parameters
----------
access_token : str
Access token
refresh_token : str
Refresh token
Returns
-------
OAuth2Session
Authenticated Requests session
"""
def save_token(token):
s.token = token
token = {
"token_type": "Bearer",
"refresh_token": refresh_token,
"access_token": access_token,
"scope": ["read", "write"],
"expires_in": 604800,
"expires_at": jwt_expires(access_token),
}
s = OAuth2Session(
token=token,
auto_refresh_url=auth_url,
token_updater=save_token,
)
return s
def session_from_envvars(
auth_url=AUTH_URL,
environ_template=(("username", "ARD_USERNAME"), ("password", "ARD_PASSWORD")),
):
"""Returns a session with the ARD authorization token baked in,
pulling the credentials from environment variables.
environ_template - An iterable of key, value pairs. The key should
be the variables used in the oauth workflow, and
the values being the environment variables to
pull the configuration from. Change the
template values if your envvars differ from the
default, but make sure the keys remain the same.
Returns
-------
OAuth2Session
Authenticated Requests session
"""
def save_token(token):
s.token = token
client_id = "dummyclientid"
client_secret = "dummyclientsecret"
environ = {var: os.environ[envvar] for var, envvar in environ_template}
s = OAuth2Session(
client=LegacyApplicationClient(client_id),
auto_refresh_url=auth_url,
auto_refresh_kwargs={"client_id": client_id, "client_secret": client_secret},
token_updater=save_token,
)
s.fetch_token(auth_url, **environ)
return s
def session_from_kwargs(**kwargs):
"""Get a session object using credentials from keywords
Parameters
----------
**kwargs : dict
Authentication keywords: username, password, client_id, client_secret
Returns
-------
OAuth2Session
Authenticated Requests session
"""
def save_token(token):
s.token = token
auth_url = AUTH_URL
s = OAuth2Session(
client=LegacyApplicationClient(kwargs.get("client_id")),
auto_refresh_url=auth_url,
auto_refresh_kwargs={
"client_id": kwargs.get("client_id"),
"client_secret": kwargs.get("client_secret"),
},
token_updater=save_token,
)
try:
s.fetch_token(
auth_url,
username=kwargs.get("username"),
password=kwargs.get("password"),
client_id=kwargs.get("client_id"),
client_secret=kwargs.get("client_secret"),
)
except MissingTokenError as e:
raise Exception("Invalid credentials passed into session_from_kwargs()")
return s
def session_from_config(config_file):
"""Get a session object using credentials from a config file
Parameters
----------
config_file : str
path to config file
Returns
-------
OAuth2Session
Authenticated Requests session
"""
def save_token(token_to_save):
"""Save off the token back to the config file."""
if not SAVE_TOKEN:
return
if not "ard_token" in set(cfg.sections()):
cfg.add_section("ard_token")
cfg.set("ard_token", "json", json.dumps(token_to_save))
with open(config_file, "w") as sink:
cfg.write(sink)
# Read the config file (ini format).
cfg = ConfigParser(interpolation=None)
if not cfg.read(config_file):
raise RuntimeError("No ini file found at {} to parse.".format(config_file))
client_id = "dummy_client_id(not-required)"
client_secret = "dummy_client_secret(not-required)"
# the ini file has the optional ability to set an auth url (useful for dev)
if not cfg.has_option("ard", "auth_url"):
auth_url = AUTH_URL
else:
auth_url = cfg.get("ard", "auth_url")
# See if we have a token stored in the config, and if not, get one.
if "ard_token" in set(cfg.sections()):
# Parse the token from the config.
token = json.loads(cfg.get("ard_token", "json"))
s = OAuth2Session(
client_id,
client=LegacyApplicationClient(client_id, token=token),
auto_refresh_url=auth_url,
auto_refresh_kwargs={"client_id": client_id, "client_secret": client_secret},
token_updater=save_token,
)
s.token = token
# hit the server to trigger a refresh if the token has expired
try:
s.get(ard_url("auth", "self"))
return s
except MissingTokenError:
pass
# No pre-existing token or the refresh expired, so we request one from the API.
s = OAuth2Session(
client_id,
client=LegacyApplicationClient(client_id),
auto_refresh_url=auth_url,
auto_refresh_kwargs={"client_id": client_id, "client_secret": client_secret},
token_updater=save_token,
)
# Get the token and save it to the config.
token = s.fetch_token(
auth_url,
username=cfg.get("ard", "user_name"),
password=cfg.get("ard", "user_password"),
client_id=client_id,
client_secret=client_secret,
auth=False,
)
save_token(token)
return s
def write_config(username, password):
"""Write a config file
The config file will be written to ~/.ard-config.
Parameters
----------
username : str
Username
password: str
Password
"""
config_file = os.path.expanduser("~/.ard-config")
cfg = ConfigParser()
cfg.read(config_file)
# create or update ard section
if "ard" not in cfg.sections():
cfg["ard"] = {}
if username:
cfg["ard"]["user_name"] = username
if password:
cfg["ard"]["user_password"] = password
# reset the token
cfg.remove_section("ard_token")
with open(config_file, "w") as configfile:
cfg.write(configfile)
def read_token():
"""Read the token from the config file
Returns
-------
str
Access token
"""
config_file = os.path.expanduser("~/.ard-config")
cfg = ConfigParser()
cfg.read(config_file)
print(json.loads(cfg["ard_token"]["json"])["access_token"])
Functions
def get_client_session(client_id=None, client_secret=None)
-
Get an authenticated session using client credentials (Maxar internal only)
If a client ID and secret are not passed as kwargs, the env vars ARD_CLIENT_ID and ARD_CLIENT_SECRET will be used if they exist.
Expand source code
def get_client_session(client_id=None, client_secret=None): """Get an authenticated session using client credentials (Maxar internal only) If a client ID and secret are not passed as kwargs, the env vars ARD_CLIENT_ID and ARD_CLIENT_SECRET will be used if they exist.""" global _CLIENT_SESSION_CACHE if _CLIENT_SESSION_CACHE[client_id] is None: _CLIENT_SESSION_CACHE[client_id] = _get_client_session( client_id=client_id, client_secret=client_secret ) return _CLIENT_SESSION_CACHE
def get_self(session)
-
Fetch the account SELF endpoint of the current user
Parameters
session
:OAuth2Session
- The session object created for the current user
Returns
dict
:SELF api payload
Expand source code
def get_self(session): """Fetch the account SELF endpoint of the current user Parameters ---------- session : OAuth2Session The session object created for the current user Returns ------- dict: SELF api payload""" r = session.get(ard_url("auth", "self")) r.raise_for_status() return r.json()
def get_user_session()
-
Get an authenticated Requests session for communicating with the ARD API
Example
>>> r = get_user_session() >>> resp = r.get('https://ard-dev.geobigdata.io/api/v1/select/request/5542092796671610498')
Returns
OAuth2Session
- Authenticated Requests session
Notes
For credentials, create a file in your home directory called .ard-config with these contents:
[ard] user_name = <your_user_name> user_password = <your_password>
or set these environment variables:
ARD_USERNAME ARD_PASSWORD
Expand source code
def get_user_session(): """Get an authenticated Requests session for communicating with the ARD API Example ------- >>> r = get_user_session() >>> resp = r.get('https://ard-dev.geobigdata.io/api/v1/select/request/5542092796671610498') Returns ------- OAuth2Session Authenticated Requests session Notes ----- For credentials, create a file in your home directory called .ard-config with these contents: [ard] user_name = <your_user_name> user_password = <your_password> or set these environment variables: ARD_USERNAME ARD_PASSWORD """ global _USER_SESSION_CACHE if _USER_SESSION_CACHE is None: _USER_SESSION_CACHE = _get_user_session() return _USER_SESSION_CACHE