Package max_ard
Tools for working with Maxar ARD
Provides
- Tools for interacting with the ARD API and files
- SDK objects
- CLI tools
Expand source code
"""Tools for working with Maxar ARD
Provides
--------
- Tools for interacting with the ARD API and files
- SDK objects
- CLI tools
"""
from max_ard.admin import AccountManager
from max_ard.ard_collection import ARDCollection
from max_ard.order import Order
from max_ard.select import Select, SelectResult
from max_ard.monitor import Monitor
__all__ = ["ARDCollection", "Order", "Select", "SelectResult", "AccountManager"]
Sub-modules
max_ard.admin
-
Perform admin functions …
max_ard.ard_collection
-
Collection objects representing stored ARD data …
max_ard.base_collections
-
Base collection types for ARD …
max_ard.commands
-
Command line tools for ARD
max_ard.dependency_support
-
Helpers for optional dependencies and platform quirks
max_ard.exceptions
-
ARD Exceptions
max_ard.io
-
Export ARD collections to other file formats …
max_ard.metadata
-
Search ARD tile metadata …
max_ard.monitor
-
ARD Monitors …
max_ard.order
-
Place ARD Orders …
max_ard.outputs
max_ard.processing
-
Tools and utilities for processing ARD images …
max_ard.select
-
Create ARD Selects and view results …
max_ard.session
-
Authenticated sessions for communicating with the ARD API endpoints
max_ard.storage
-
Utilities for storage backends …
Classes
class ARDCollection (path: str, aoi: Optional[Any] = None, acq_id_in: Optional[Iterable[str]] = None, zone_in: Optional[Iterable[int]] = None, earliest_date: Union[str, datetime.datetime, datetime.date, None] = None, latest_date: Union[str, datetime.datetime, datetime.date, None] = None, profile: Optional[str] = None, public: Optional[bool] = False, **kwargs)
-
ARDCollections represent collections of S3 tiles. Currently the tiles can be stored in S3 or locally.
Parameters
path
:str
- Path to S3 prefix or STAC collection.
aoi
:shapely.geometry
orstr
, optional- Limit to finding tiles that cover this AOI, can be shapely geometry or most textual representations.
acq_id_in
:iterable
ofstr
, optional- Limit to finding tiles from these acquisitions.
zone_in
:iterable
ofint
, optional- Limit to finding tiles in these zones.
earliest_date
:str
ordatetime.date
ordatetime.datetime
, optional- Limit to finding tiles after this date (strings must be YYYY-MM-DD).
latest_date
:str
ordatetime.date
ordatetime.datetime
, optional- Limit or finding tiles before this date.
profile
:str
, optional- AWS Profile to use when tiles are in S3.
public
:bool
- Access cloud data without authentication (for public buckets).
The following parameters are also settable attributes and will trigger a rescan
Attributes
path
aoi
acq_id_in
zone_in
earliest_date
latest_date
Expand source code
class ARDCollection(BaseCollection): """ARDCollections represent collections of S3 tiles. Currently the tiles can be stored in S3 or locally. Parameters ---------- path : str Path to S3 prefix or STAC collection. aoi : shapely.geometry or str, optional Limit to finding tiles that cover this AOI, can be shapely geometry or most textual representations. acq_id_in : iterable of str, optional Limit to finding tiles from these acquisitions. zone_in : iterable of int, optional Limit to finding tiles in these zones. earliest_date : str or datetime.date or datetime.datetime, optional Limit to finding tiles after this date (strings must be YYYY-MM-DD). latest_date : str or datetime.date or datetime.datetime, optional Limit or finding tiles before this date. profile : str, optional AWS Profile to use when tiles are in S3. public : bool Access cloud data without authentication (for public buckets). The following parameters are also settable attributes and will trigger a rescan Attributes ---------- path aoi acq_id_in zone_in earliest_date latest_date """ def __init__( self, path: str, aoi: Optional[Any] = None, acq_id_in: Optional[Iterable[str]] = None, zone_in: Optional[Iterable[int]] = None, earliest_date: Optional[Union[str, datetime, date]] = None, latest_date: Optional[Union[str, datetime, date]] = None, profile: Optional[str] = None, public: Optional[bool] = False, **kwargs, ) -> None: self._dirty = True self._updating = False super().__init__() # TODO we may want to normalize AOIs to WGS84 # but right now `covers` is probably capable enough self.aoi = aoi # disable authentication for public buckets # 'anon' is what fsspec calls it but 'public' makes more sense # however 'anon' can also have implications for Azure so # we should be able to override it just in case if "anon" in kwargs: anon = kwargs["anon"] else: # using the recommend Azure connection string access, anon needs to be True if path.startswith("az"): anon = True else: anon = public # For GDAL S3 locations we can turn off signing so # you can have expired credentials if anon and path.startswith("s3"): os.environ["AWS_NO_SIGN_REQUEST"] = "YES" # validate some inputs that have been problematic in the past assert zone_in is None or all(type(z) == int for z in zone_in), "Zones must be integers" assert zone_in is None or all(z - 1 in range(60) for z in zone_in), "Invalid zone numbers" if not zone_in: zone_in = None self.zone_in = zone_in assert acq_id_in is None or all( type(c) == str for c in acq_id_in ), "Catalog IDs must be strings" if not acq_id_in: acq_id_in = None self.acq_id_in = acq_id_in # store dates as strings, reformat if needed def format_date(d): if d is None: return None try: return d.strftime("%Y-%m-%d") except: assert re.match( r"\d{4}-\d{2}-\d{2}", d ), "Dates must be YYYY-MM-DD strings, or date/datetime objects" return d self.earliest_date = format_date(earliest_date) self.latest_date = format_date(latest_date) # Set up the path and initialize the filesystem source self.path = path # this might need to be smarter (os.path?) for windows slashes if self.path[-1] == "/": self.path = self.path[:-1] if os.path.exists(self.path): self.storage_type = "file" self.fs_path = os.path.abspath(self.path) else: parsed = urlparse(self.path) if parsed.scheme == "": raise ValueError("Local path does not exist") if parsed.scheme not in ["s3", "gs", "az"]: raise ValueError("Unrecognized protocol (use s3://, gs://, or az://") self.storage_type = parsed.scheme # might not need this, s3fs doesn't care about paths leading with protocols # need to check gdal, etc self.fs_path = parsed.netloc + parsed.path if self.storage_type == "az": # workarounds for to make azure credentials easier to deal with from max_ard.dependency_support.azure import sync_envvars sync_envvars() self.fs = filesystem(self.storage_type, anon=anon, profile=profile) try: self.fs.ls(self.path) except: raise ValueError("Access error: check your path for errors and access permissions") def __setattr__(self, name, value): """Some attributes are read-only properties Related setters need to reset the collection state""" if name == "acq_ids": raise ValueError(".acq_ids is read-only - set .acq_id_in instead") if name == "zones": raise ValueError(".zones is read-only - set .zone_in instead") if name == "start_date": raise ValueError(".start_date is read-only - set .earliest_date instead") if name == "acq_ids": raise ValueError(".end_date is read-only - set .latest_date instead") if name in ["acq_id_in", "zone_in", "earliest_date", "latest_date", "aoi"]: # resets the bins self._dirty = True object.__setattr__(self, name, value) def __getattribute__(self, name): dirty = object.__getattribute__(self, "_dirty") updating = object.__getattribute__(self, "_updating") if ( dirty and not updating and name in [ "tiles", "acquisitions", "acquisition_ids", "stacks", "cells", "get_stack", "get_acquisition", "dates", "earliest_date", "latest_date", "zones", "read_windows", "write_windows", ] ): object.__setattr__(self, "_updating", True) self._scan_files() object.__setattr__(self, "_dirty", False) object.__setattr__(self, "_updating", False) return object.__getattribute__(self, name) def __repr__(self) -> str: return f"<ARDCollection at {self.path}/>" def _scan_files(self) -> None: self._reset() if self.aoi is not None: cells = set([f"{c.zone}/{c.quadkey}" for c in covers(self.aoi)]) else: cells = [] # STAC source if self.path.endswith("json"): with self.fs.open(self.path) as f: doc = json.load(f) # STAC Item if "type" in doc.keys(): items = [self.path] # STAC Collection else: root_path = self.fs_path.split("order_collections")[0] items = [] for link in doc["links"]: if link["rel"] != "child": continue path = root_path + link["href"].replace("../", "") with self.fs.open(path) as f: links = json.load(f)["links"] for link in links: if link["rel"] == "item": path = root_path + link["href"].replace("../", "") items.append(path) # Filesystem source else: # build leading glob pattern based on zone & quadkey if self.aoi is not None: # shard on first 5 digits on quadkey # for parallel fetches qkbs = set() for qk in cells: qkbs.add(qk[:5]) paths = [f"{k}*/*/*.json" for k in qkbs] else: if not self.zone_in or len(self.zone_in) == 0: paths = ["*/*/*/*.json"] else: paths = [f"{z}/*/*/*.json" for z in self.zone_in] items = [] for path in paths: full_path = f"{self.path}/{path}" for item in self.fs.glob(full_path): items.append(item) # Filter out items for item in items: tile = ARDTile.from_doc(self.fs, item) if self.aoi is not None: if f"{tile.zone}/{tile.quadkey}" not in cells: continue if self.acq_id_in is not None and tile.acq_id not in self.acq_id_in: continue if self.earliest_date is not None: if tile.date < self.earliest_date: continue if self.latest_date is not None: if tile.date > self.latest_date: continue self.add_tile(tile) def clear_filesystem_cache(self): """ Clear the local cache of a remote filesystem Remote file systems (S3, Azure, Google Cloud) cache files locally for speed. If the remote files have changed while using an ARDCollectino, you can clear the cached files so that new files will be loaded. Parameters ---------- Returns ------- """ self.fs.clear_instance_cache() @classmethod def from_order(cls, order, **kwargs): """ Create an ARDCollection from an order ID. Accepts all filter keywords as used by class initialization. Parameters ---------- order_id : str or Order Order object or Order ID to open. **kwargs Filter keywords as used by class initialization. Returns ------- ARDCollection """ if type(order) == str: order = Order.from_id(order) if not order.finished: raise NotFinished output_config = order.response.order.output_config if "bucket" in output_config: bucket = output_config["bucket"] prefix = output_config["prefix"] protocol = "s3" elif "amazon_s3" in output_config: bucket = output_config["amazon_s3"]["bucket"] prefix = output_config["amazon_s3"]["prefix"] protocol = "s3" elif "google_cloud_storage" in order: bucket = output_config["google_cloud_storage"]["bucket"] prefix = output_config["google_cloud_storage"]["prefix"] protocol = "gs" elif "azure_blob_storage" in order: bucket = output_config["azure_blob_storage"]["container"] prefix = output_config["azure_blob_storage"]["prefix"] protocol = "az" path = f"{protocol}://{bucket}/{prefix}/order_collections/{order.order_id}_root_collection.json" self = cls(path, **kwargs) return self def read_windows(self, *args, **kwargs): """See `max_ard.processing.read_windows`""" return read_windows(self, *args, **kwargs) def write_windows(self, *args, **kwargs): """See `max_ard.processing.write_windows`""" return write_windows(self, *args, **kwargs)
Ancestors
Static methods
def from_order(order, **kwargs)
-
Create an ARDCollection from an order ID.
Accepts all filter keywords as used by class initialization.
Parameters
order_id
:str
orOrder
- Order object or Order ID to open.
**kwargs
- Filter keywords as used by class initialization.
Returns
Expand source code
@classmethod def from_order(cls, order, **kwargs): """ Create an ARDCollection from an order ID. Accepts all filter keywords as used by class initialization. Parameters ---------- order_id : str or Order Order object or Order ID to open. **kwargs Filter keywords as used by class initialization. Returns ------- ARDCollection """ if type(order) == str: order = Order.from_id(order) if not order.finished: raise NotFinished output_config = order.response.order.output_config if "bucket" in output_config: bucket = output_config["bucket"] prefix = output_config["prefix"] protocol = "s3" elif "amazon_s3" in output_config: bucket = output_config["amazon_s3"]["bucket"] prefix = output_config["amazon_s3"]["prefix"] protocol = "s3" elif "google_cloud_storage" in order: bucket = output_config["google_cloud_storage"]["bucket"] prefix = output_config["google_cloud_storage"]["prefix"] protocol = "gs" elif "azure_blob_storage" in order: bucket = output_config["azure_blob_storage"]["container"] prefix = output_config["azure_blob_storage"]["prefix"] protocol = "az" path = f"{protocol}://{bucket}/{prefix}/order_collections/{order.order_id}_root_collection.json" self = cls(path, **kwargs) return self
Methods
def clear_filesystem_cache(self)
-
Clear the local cache of a remote filesystem
Remote file systems (S3, Azure, Google Cloud) cache files locally for speed. If the remote files have changed while using an ARDCollectino, you can clear the cached files so that new files will be loaded.
Parameters
Returns
Expand source code
def clear_filesystem_cache(self): """ Clear the local cache of a remote filesystem Remote file systems (S3, Azure, Google Cloud) cache files locally for speed. If the remote files have changed while using an ARDCollectino, you can clear the cached files so that new files will be loaded. Parameters ---------- Returns ------- """ self.fs.clear_instance_cache()
def read_windows(self, *args, **kwargs)
-
See
read_windows()
Expand source code
def read_windows(self, *args, **kwargs): """See `max_ard.processing.read_windows`""" return read_windows(self, *args, **kwargs)
def write_windows(self, *args, **kwargs)
-
See
write_windows()
Expand source code
def write_windows(self, *args, **kwargs): """See `max_ard.processing.write_windows`""" return write_windows(self, *args, **kwargs)
Inherited members
class AccountManager (account_id=None, session=None)
-
Manages account-related actions
Arguments
account_id
:str (optional)
- Account ID to manage. If not provided, will use the current user's account
session
:Requests max_ard.session object (optional)
- The session to use to communicate with the API. If not provided, uses the current user's authenticated session
Expand source code
class AccountManager: """Manages account-related actions Arguments --------- account_id : str (optional) Account ID to manage. If not provided, will use the current user's account session : Requests session object (optional) The session to use to communicate with the API. If not provided, uses the current user's authenticated session """ __all__ = ["get_account_usage", "get_user_usage"] def __init__(self, account_id=None, session=None) -> None: if not session: session = get_user_session() self.session = session if not account_id: account_id = get_self(self.session)["user"]["account_id"] self.account_id = account_id def admin_url(self, *args): return ard_url("admin", "account", self.account_id, *args) @property def properties(self): return self.session.get(self.admin_url()).json()["account"] ######## # Usage ######## def _validate_dates(self, *args): """Validate that dates are YYYY-MM-DD Arguments --------- *args : str One or more date strings to validate Returns ------- None Raises ------ ValueError If one or more dates was not YYYY-MM-DD""" for date in args: if date not in [None, ""]: try: datetime.strptime(date, "%Y-%m-%d") except ValueError: raise ValueError("Dates must be YYYY-MM-DD") def get_account_usage(self, start_date=None, end_date=None): """Get account-level data usage Parameters ---------- start_date : str (optional) Start date to calculate usage start_date : str (optional) End date to calculate usage Returns ------- AdminUsage Object model of data usage """ url = ard_url("usage", "account", self.account_id) self._validate_dates(start_date, end_date) params = {"start_date": start_date, "end_date": end_date} r = self.session.get(url, params=params) return AdminUsage(**r.json()["usage"]) def get_user_usage(self, username=None, start_date=None, end_date=None): """Get user-level data usage Parameters ---------- username : str (optional) User name to calculate usage for, if not provided the current user is used start_date : str (optional) Start date to calculate usage end_date : str (optional) End date to calculate usage Returns ------- AdminUsage Object model of data usage """ if not username: username = get_self(self.session)["user"]["user_id"] url = ard_url("usage", "user", username) self._validate_dates(start_date, end_date) params = {"start_date": start_date, "end_date": end_date} r = self.session.get(url, params=params) return AdminUsage(**r.json()["usage"]) ##### # Credentials Storage ##### def add_credentials(self, name, credentials, description=""): """Saves credentials in the Crendentials API The stored credentials key (SAS Url for Azure or Base64-encoded Credentials JSON for GCS) is never returned. Only the name, description, and other metadata about the credentials object is provided. This is only available to Admin users. However all users in the account may use a stored credential name to provide write access when ordering data. Parameters ---------- name : str Name of the stored credentials, should have URL-friendly characters (no spaces) credentials : str Credentials secret to store (SAS Url for Azure or Base64 Credentials JSON for GCS). If this is a path to a GCS JSON file, it will be read and Base64 encoded. description : str (optional) Description of the credentials Returns ------- dict API response JSON dictionary """ if urllib.parse.quote(name) != name: raise ValueError( "Stored credential names should not use characters that require URL-encoding" ) url = self.admin_url("credentials", name) try: with open(credentials, "rb") as binary_file: encoded = base64.b64encode(binary_file.read()) credentials = encoded.decode("ascii") except (FileNotFoundError, OSError): pass payload = {"credentials": credentials, "description": description} response = self.session.put(url, json=payload) if response.status_code == 204: return None else: return response.json() def get_credentials(self, name, raw=False): """Retrieves information about stored credentials. The stored credentials key (SAS Url for Azure or Credentials JSON for GCS) is never returned, only the name, description, and other metadata about the credentials object. This is only available to Admin users. However all users in the account may use a stored credential name to provide write access when ordering data. Parameters ---------- name : str Name of the stored credentials raw : bool (optional) Return the response JSON Returns ------- RegisteredCredentials Object model of credentials """ url = self.admin_url("credentials", name) response = self.session.get(url) if raw: return response.json() else: return RegisteredCredentials(**response.json()["registered_credentials"]) def list_credentials(self, raw=False): """Retrieves all stored credentials in an account. The stored credentials key (SAS Url for Azure or Credentials JSON for GCS) is never returned, only the name, description, and other metadata about the credentials object. This is only available to Admin users. However all users in the account may use a stored credential name to provide write access when ordering data. Parameters ---------- raw : bool (optional) Return the response JSON Returns ------- List of RegisteredCredentials List of object models of credentials """ # responses can be paginated id_key = lambda credential: credential["registered_credentials"]["credentials_id"] url = self.admin_url("credentials") credentials = paginated_response(self.session, url, id_key) if raw: return credentials else: object_key = lambda credential: credential["registered_credentials"] return hydrate(RegisteredCredentials, credentials, key=object_key) def delete_credentials(self, name): """Deletes stored credentials by name. This is only available to Admin users. However all users in the account may use a stored credential name to provide write access when ordering data. Parameters ---------- name : str Name of the credential object """ url = self.admin_url("credentials", name) return self.session.delete(url) ##### # Monitors ##### def list_monitors(self, raw=False): """Retrieves all monitors for an account. Parameters ---------- raw : bool (optional) Return the response JSON Returns ------- List of Monitors List of object models of monitors """ id_key = lambda monitor: monitor["monitor_id"] url = self.admin_url("monitor") monitors = paginated_response(self.session, url, id_key) if raw: return monitors else: return [Monitor.from_response(monitor) for monitor in monitors]
Instance variables
var properties
-
Expand source code
@property def properties(self): return self.session.get(self.admin_url()).json()["account"]
Methods
def add_credentials(self, name, credentials, description='')
-
Saves credentials in the Crendentials API
The stored credentials key (SAS Url for Azure or Base64-encoded Credentials JSON for GCS) is never returned. Only the name, description, and other metadata about the credentials object is provided.
This is only available to Admin users. However all users in the account may use a stored credential name to provide write access when ordering data.
Parameters
name
:str
- Name of the stored credentials, should have URL-friendly characters (no spaces)
credentials
:str
- Credentials secret to store (SAS Url for Azure or Base64 Credentials JSON for GCS). If this is a path to a GCS JSON file, it will be read and Base64 encoded.
description
:str (optional)
- Description of the credentials
Returns
dict
- API response JSON dictionary
Expand source code
def add_credentials(self, name, credentials, description=""): """Saves credentials in the Crendentials API The stored credentials key (SAS Url for Azure or Base64-encoded Credentials JSON for GCS) is never returned. Only the name, description, and other metadata about the credentials object is provided. This is only available to Admin users. However all users in the account may use a stored credential name to provide write access when ordering data. Parameters ---------- name : str Name of the stored credentials, should have URL-friendly characters (no spaces) credentials : str Credentials secret to store (SAS Url for Azure or Base64 Credentials JSON for GCS). If this is a path to a GCS JSON file, it will be read and Base64 encoded. description : str (optional) Description of the credentials Returns ------- dict API response JSON dictionary """ if urllib.parse.quote(name) != name: raise ValueError( "Stored credential names should not use characters that require URL-encoding" ) url = self.admin_url("credentials", name) try: with open(credentials, "rb") as binary_file: encoded = base64.b64encode(binary_file.read()) credentials = encoded.decode("ascii") except (FileNotFoundError, OSError): pass payload = {"credentials": credentials, "description": description} response = self.session.put(url, json=payload) if response.status_code == 204: return None else: return response.json()
def admin_url(self, *args)
-
Expand source code
def admin_url(self, *args): return ard_url("admin", "account", self.account_id, *args)
def delete_credentials(self, name)
-
Deletes stored credentials by name.
This is only available to Admin users. However all users in the account may use a stored credential name to provide write access when ordering data.
Parameters
name
:str
- Name of the credential object
Expand source code
def delete_credentials(self, name): """Deletes stored credentials by name. This is only available to Admin users. However all users in the account may use a stored credential name to provide write access when ordering data. Parameters ---------- name : str Name of the credential object """ url = self.admin_url("credentials", name) return self.session.delete(url)
def get_account_usage(self, start_date=None, end_date=None)
-
Get account-level data usage
Parameters
start_date
:str (optional)
- Start date to calculate usage
start_date
:str (optional)
- End date to calculate usage
Returns
AdminUsage
- Object model of data usage
Expand source code
def get_account_usage(self, start_date=None, end_date=None): """Get account-level data usage Parameters ---------- start_date : str (optional) Start date to calculate usage start_date : str (optional) End date to calculate usage Returns ------- AdminUsage Object model of data usage """ url = ard_url("usage", "account", self.account_id) self._validate_dates(start_date, end_date) params = {"start_date": start_date, "end_date": end_date} r = self.session.get(url, params=params) return AdminUsage(**r.json()["usage"])
def get_credentials(self, name, raw=False)
-
Retrieves information about stored credentials.
The stored credentials key (SAS Url for Azure or Credentials JSON for GCS) is never returned, only the name, description, and other metadata about the credentials object.
This is only available to Admin users. However all users in the account may use a stored credential name to provide write access when ordering data.
Parameters
name
:str
- Name of the stored credentials
raw
:bool (optional)
- Return the response JSON
Returns
RegisteredCredentials
- Object model of credentials
Expand source code
def get_credentials(self, name, raw=False): """Retrieves information about stored credentials. The stored credentials key (SAS Url for Azure or Credentials JSON for GCS) is never returned, only the name, description, and other metadata about the credentials object. This is only available to Admin users. However all users in the account may use a stored credential name to provide write access when ordering data. Parameters ---------- name : str Name of the stored credentials raw : bool (optional) Return the response JSON Returns ------- RegisteredCredentials Object model of credentials """ url = self.admin_url("credentials", name) response = self.session.get(url) if raw: return response.json() else: return RegisteredCredentials(**response.json()["registered_credentials"])
def get_user_usage(self, username=None, start_date=None, end_date=None)
-
Get user-level data usage
Parameters
username
:str (optional)
- User name to calculate usage for, if not provided the current user is used
start_date
:str (optional)
- Start date to calculate usage
end_date
:str (optional)
- End date to calculate usage
Returns
AdminUsage
- Object model of data usage
Expand source code
def get_user_usage(self, username=None, start_date=None, end_date=None): """Get user-level data usage Parameters ---------- username : str (optional) User name to calculate usage for, if not provided the current user is used start_date : str (optional) Start date to calculate usage end_date : str (optional) End date to calculate usage Returns ------- AdminUsage Object model of data usage """ if not username: username = get_self(self.session)["user"]["user_id"] url = ard_url("usage", "user", username) self._validate_dates(start_date, end_date) params = {"start_date": start_date, "end_date": end_date} r = self.session.get(url, params=params) return AdminUsage(**r.json()["usage"])
def list_credentials(self, raw=False)
-
Retrieves all stored credentials in an account.
The stored credentials key (SAS Url for Azure or Credentials JSON for GCS) is never returned, only the name, description, and other metadata about the credentials object.
This is only available to Admin users. However all users in the account may use a stored credential name to provide write access when ordering data.
Parameters
raw
:bool (optional)
- Return the response JSON
Returns
List
ofRegisteredCredentials
- List of object models of credentials
Expand source code
def list_credentials(self, raw=False): """Retrieves all stored credentials in an account. The stored credentials key (SAS Url for Azure or Credentials JSON for GCS) is never returned, only the name, description, and other metadata about the credentials object. This is only available to Admin users. However all users in the account may use a stored credential name to provide write access when ordering data. Parameters ---------- raw : bool (optional) Return the response JSON Returns ------- List of RegisteredCredentials List of object models of credentials """ # responses can be paginated id_key = lambda credential: credential["registered_credentials"]["credentials_id"] url = self.admin_url("credentials") credentials = paginated_response(self.session, url, id_key) if raw: return credentials else: object_key = lambda credential: credential["registered_credentials"] return hydrate(RegisteredCredentials, credentials, key=object_key)
def list_monitors(self, raw=False)
-
Retrieves all monitors for an account.
Parameters
raw
:bool (optional)
- Return the response JSON
Returns
List
ofMonitors
- List of object models of monitors
Expand source code
def list_monitors(self, raw=False): """Retrieves all monitors for an account. Parameters ---------- raw : bool (optional) Return the response JSON Returns ------- List of Monitors List of object models of monitors """ id_key = lambda monitor: monitor["monitor_id"] url = self.admin_url("monitor") monitors = paginated_response(self.session, url, id_key) if raw: return monitors else: return [Monitor.from_response(monitor) for monitor in monitors]
class Order (acquisitions=None, select_id=None, destination=None, output_config=None, settings={}, intersects=None, bbox=None, role_arn=None, dry_run=False, bba=False, metadata={}, session=None)
-
An ARD API Order object
Parameters
acquisitions
:iterable
ofstr
ordict
, optional- An iterable of acquisitions to order, see Notes
select_id
:str
- A Select ID to order, see Notes
- destination:
- For S3 locations only, a path to storage location such as s3://my-bucket/my-prefix
- output_config:
- If not using
destination
, an output configuration dictionary, see API documentation for examples intersects
:Geometry-like objects, str (optional))
- Geometry to intersect, can be most geometry or spatial objects, or a path (see Notes)
bbox
:interable
ofnumeric
, optional- Like
intersects
, a bounding box in WGS84 coordinates, [XMIN YMIN XMAX YMAX] role_arn
:str
, optional- A trusted Role ARN for the writer to assume so it can write tiles. This is not used if the s3 bucket policy allows writing tiles to the bucket.
dry_run
:bool
, optional- When true, runs pre-order checks to check if order is valid but does not generate imagery
bba
:bool
, optional- When true, Block Bundle Adjustment will be applied to the order.
metadata
:dict
, optional- User-supplied metadata
settings
:dict
, optional- Dictionary of settings to override outputs, see Notes
session
:max_ard.session object
, optional- A user_session or application_session object
Attributes
session
:max_ard.session object
orNone
- A user_session or application_session object
dry_run
:bool
- as above
submitted
:bool
- True if Select has been submitted via
Order.submit()
request
:Pydantic model
- Parameters are loaded into a Pydantic model of the HTTP API request
response
:Pydantic model
- Pydantic model of the server response to API call. Status and submit calls return the same payload.
Notes
An order must specify
acquisitions
orselect_id
.If ordering by select ID, do not include acquisition IDs or an AOI in the order request.
Acquisitions can be a list of acqusitions IDs, or a a list of dictionaries, with the keys
id
andcells
. If no cells are specified, it is assumed all cells are wanted (subject to clipping by an AOI or BBOX):acquisitions=["103001009E8G3C90"]
or
acquisitions=[ { "id": "103001007B478000", "cells": ["Z17-031313123113", "Z17-031313123112"] }, { "id": "103001009E8C7C00", "cells": ["Z17-031313123113"] }, { "id": "103001009E8G3C90" } ]
Intersects inputs: Geometry objects: Shapely shapes, objects supporting geo_interface, geojson-like dicts, geojson and wkt strings Geometry iterables: iterables of above, Fiona readers File paths: most spatial file formats. WKT and Geojson supported with base install, other formats require Fiona for reading
Settings dictionary defaults: You can override one or more of the following values:
{ "bundle_adjust": false, "cloud_mask": true, "data_mask": true, "healthy_vegetation_mask": true, "ms_analytic": true, "ms_saturation_mask": true, "pan_analytic": true, "pan_flare_mask": true, "terrain_shadow_mask": true, "visual": true, "water_mask": true }
Expand source code
class Order(Submitted): """An ARD API Order object Parameters ---------- acquisitions : iterable of str or dict, optional An iterable of acquisitions to order, see Notes select_id : str A Select ID to order, see Notes destination: For S3 locations only, a path to storage location such as s3://my-bucket/my-prefix output_config: If not using `destination`, an output configuration dictionary, see API documentation for examples intersects : Geometry-like objects, str (optional)) Geometry to intersect, can be most geometry or spatial objects, or a path (see Notes) bbox : interable of numeric, optional Like `intersects`, a bounding box in WGS84 coordinates, [XMIN YMIN XMAX YMAX] role_arn : str, optional A trusted Role ARN for the writer to assume so it can write tiles. This is not used if the s3 bucket policy allows writing tiles to the bucket. dry_run : bool, optional When true, runs pre-order checks to check if order is valid but does not generate imagery bba : bool, optional When true, Block Bundle Adjustment will be applied to the order. metadata : dict, optional User-supplied metadata settings : dict, optional Dictionary of settings to override outputs, see Notes session : session object, optional A user_session or application_session object Attributes ---------- session : session object or None A user_session or application_session object dry_run : bool as above submitted : bool True if Select has been submitted via `max_ard.order.Order.submit` request : Pydantic model Parameters are loaded into a Pydantic model of the HTTP API request response : Pydantic model Pydantic model of the server response to API call. Status and submit calls return the same payload. Notes ----- An order must specify `acquisitions` or `select_id`. If ordering by select ID, do not include acquisition IDs or an AOI in the order request. Acquisitions can be a list of acqusitions IDs, or a a list of dictionaries, with the keys `id` and `cells`. If no cells are specified, it is assumed all cells are wanted (subject to clipping by an AOI or BBOX): acquisitions=["103001009E8G3C90"] or acquisitions=[ { "id": "103001007B478000", "cells": ["Z17-031313123113", "Z17-031313123112"] }, { "id": "103001009E8C7C00", "cells": ["Z17-031313123113"] }, { "id": "103001009E8G3C90" } ] Intersects inputs: Geometry objects: Shapely shapes, objects supporting __geo_interface__, geojson-like dicts, geojson and wkt strings Geometry iterables: iterables of above, Fiona readers File paths: most spatial file formats. WKT and Geojson supported with base install, other formats require Fiona for reading Settings dictionary defaults: You can override one or more of the following values: { "bundle_adjust": false, "cloud_mask": true, "data_mask": true, "healthy_vegetation_mask": true, "ms_analytic": true, "ms_saturation_mask": true, "pan_analytic": true, "pan_flare_mask": true, "terrain_shadow_mask": true, "visual": true, "water_mask": true }""" def __init__( self, acquisitions=None, select_id=None, destination=None, output_config=None, settings={}, intersects=None, bbox=None, role_arn=None, dry_run=False, bba=False, metadata={}, session=None, ): self.session = session or get_user_session() # check booleans aren't strings if type(dry_run) is not bool: raise ValueError(f"Parameter `dry_run` needs to be a boolean, not {type(dry_run)}") if type(bba) is not bool: raise ValueError(f"Parameter `bba` needs to be a boolean, not {type(bba)}") # bba to be moved to `settings` instead of top level param if "bundle_adjust" not in settings: settings["bundle_adjust"] = bba # set up the output config if destination is not None: destination = destination.replace("s3://", "") parts = destination.split("/", 1) bucket = parts[0] try: prefix = parts[1] except IndexError: prefix = "" # old style output config destination = {"bucket": bucket, "prefix": prefix, "role_arn": role_arn} output_config = self._validate_output({"destination": destination}) else: if output_config is not None: output_config = self._validate_output(output_config) # set up acquisitions if type(acquisitions) == str: acquisitions = [Acquisition(id=acquisitions)] elif type(acquisitions) == list or type(acquisitions) == tuple: if len(acquisitions) > 0: if type(acquisitions[0]) == str: acquisitions = [Acquisition(id=v) for v in acquisitions] elif type(acquisitions) == dict: acquisitions = [Acquisition(id=k, cells=v) for k, v in acquisitions.items()] elif isinstance(acquisitions, BaseCollection): acquisitions = [Acquisition(**acq) for acq in acquisitions.as_order()] # if the intersects is a Shapely geom, convert it to wkt if intersects is not None: intersects = convert_to_shapely(intersects).wkt self.request = OrderRequest( acquisitions=acquisitions, select_id=select_id, intersects=intersects, bbox=bbox, output_config=output_config, settings=settings, dry_run=dry_run, metadata=metadata, ) self.dry_run = dry_run self.response = None def _validate_output(self, output_config): platforms = { "azure_blob_storage": AZ_Config, "google_cloud_storage": GCS_Config, "amazon_s3": S3_Config, "destination": Output_Config, } platform = list(output_config.keys())[0] config_model = platforms.get(platform, None) if config_model is None: raise ValueError( "Output config format not recognized, please see documentation for examples" ) params = output_config[platform].copy() # validation on object creation config_obj = config_model(**params) params = {k: v for k, v in config_obj.dict().items() if v is not None} # convert credential files to b64 encoding if platform == "google_cloud_storage": if "credentials_id" not in params: location = params["service_credentials"] try: with open(location, "rb") as binary_file: encoded = base64.b64encode(binary_file.read()) params["service_credentials"] = encoded.decode("ascii") except (FileNotFoundError, OSError): try: creds = base64.b64decode(location) assert "type" in json.loads(creds) except: raise ValueError( "GCS service credentials should be either a Base64-encoded string" + " of the JSON credentials file contents or a path to the JSON credentials file" ) else: if "service_credentials" in params: c_id = params["credentials_id"] warnings.warn( "Both a stored credential ID and service credential argument were provided. " + f'The store credential ID "{c_id}" will be used' ) del params["service_credentials"] if platform != "destination": return {platform: params} else: return params def add_email_notification(self, address): """Add an email notification to the order Parameters ---------- address : str Email address to receive order notifications""" self.request.notifications.append(EmailNotification(address=address)) def add_sns_notification(self, topic_arn): """Add an AWS SNS notification topic to receive order notifications Parameters ---------- topic_arn : str AWS SNS topic ARN to recieve order notifications""" self.request.notifications.append(SNSNotification(topic_arn=topic_arn)) def __repr__(self): try: return f"<Order {self.order_id} ({self.status})>" except NotSubmitted: return "<Order (Not submitted)>" @property def submitted(self): if self.response: return self.response.id is not None else: return False @property @Submitted.required def finished(self): """The Order finished processing but may have failed""" return self.status != "RUNNING" @property @Submitted.required def running(self): """The Order is running""" return self.status == "RUNNING" @property @Submitted.required def succeeded(self): """The Order has finished running and has succeeded""" return self.status == "SUCCEEDED" @property @Submitted.required def usage(self): if self.response.order is None: # just submitted, refresh the order status response = self.get_order(self.order_id) self.response = OrderResponse(**response) return self.response.order.usage @property @Submitted.required def failed(self): """The Order has finished running but failed""" return self.status in ["FAILED", "ERROR"] @property @Submitted.required def order_id(self): """The Order ID""" return self.response.id @property @Submitted.required def status(self): """State of the order process: 'RUNNING', 'SUCCEEDED, or 'FAILED'""" if self.response.status == "RUNNING": response = self.get_order(self.order_id) self.response = OrderResponse(**response) return self.response.status @property def state(self): """Legacy version of `status`, will be deprecated in the future""" return self.status @classmethod def from_id(cls, order_id, session=None): """Create a Order object from an ID Parameters ---------- order_id: str Order ID to hydrate into a Order object session : Session object, optional Authenticated session, such as from get_client_session() Returns ------- Order""" response = cls.get_order(order_id, session) instance = cls() instance.response = OrderResponse(**response) return instance @classmethod def list_orders( cls, limit=None, starting_after="", ending_before="", start_date="", end_date="", filter=None, session=None, ): """Fetch user's orders Parameters ---------- limit: int or None, default None maximum number of orders to fetch, None (default) means unlimited starting_after: str the order_id after which further responses will be returned, paging forward ending_before: str the order_id before which further responses will be returned, paging backwards start_date: str starting date to filter, ISO-8601 YYYY-MM-DD start_date: str ending date to filter, ISO-8601 YYYY-MM-DD filter: str filter results that match values contained in the given key separated by a colon. Example: 'metadata.downstream_customer_id:abdc-534-b4dc47' session: Session object, optional Authenticated session, such as from get_client_session() Returns ------- list Order objects matching parameters""" session = session or get_user_session() params = { "starting_after": starting_after, "ending_before": ending_before, "start_date": start_date, "end_date": end_date, } if filter is not None: params["filter"] = filter key = lambda order: order["id"] responses = paginated_response(session, ard_url("order"), key, limit, **params) return hydrate_with_responses(Order, OrderResponse, responses, session=session) @classmethod def get_order(cls, order_id, session=None): """Fetch raw data about an Order from an ID Parameters ---------- order_id : str Order ID to fetch metadata for session: Session object, optional Authenticated session, such as from get_client_session() Returns ------- dict API data for the given Order""" if not session: session = get_user_session() r = session.get(ard_url("order", "status", order_id)) return r.json() @classmethod def send_order(cls, payload, session=None): """Send a request to the Order API Parameters ---------- payload : dict Order API request payload session : Session object, optional Authenticated session, such as from get_client_session() Returns ------- dict API response data for the given Order""" if not session: session = get_user_session() r = session.post(ard_url("order"), json=payload) return r.json() def submit(self): """Submit this Order to the API""" response = self.send_order(self.request.to_payload(), session=self.session) if self.submitted: warnings.warn("The Order has already been submitted") return if self.dry_run: self.response = OrderResponse( id="dry run", status="SUCCEEDED", order={"usage": response["usage"]}, status_message="Order dry-run validation successful.", ) else: self.response = OrderResponse( id=response["id"], status="RUNNING", status_message="Order submitted and running." )
Ancestors
- Submitted
- abc.ABC
Static methods
def from_id(order_id, session=None)
-
Create a Order object from an ID
Parameters
order_id
:str
- Order ID to hydrate into a Order object
session
:Session object
, optional- Authenticated session, such as from get_client_session()
Returns
Expand source code
@classmethod def from_id(cls, order_id, session=None): """Create a Order object from an ID Parameters ---------- order_id: str Order ID to hydrate into a Order object session : Session object, optional Authenticated session, such as from get_client_session() Returns ------- Order""" response = cls.get_order(order_id, session) instance = cls() instance.response = OrderResponse(**response) return instance
def get_order(order_id, session=None)
-
Fetch raw data about an Order from an ID
Parameters
order_id
:str
- Order ID to fetch metadata for
session
:Session object
, optional- Authenticated session, such as from get_client_session()
Returns
dict
- API data for the given Order
Expand source code
@classmethod def get_order(cls, order_id, session=None): """Fetch raw data about an Order from an ID Parameters ---------- order_id : str Order ID to fetch metadata for session: Session object, optional Authenticated session, such as from get_client_session() Returns ------- dict API data for the given Order""" if not session: session = get_user_session() r = session.get(ard_url("order", "status", order_id)) return r.json()
def list_orders(limit=None, starting_after='', ending_before='', start_date='', end_date='', filter=None, session=None)
-
Fetch user's orders
Parameters
limit
:int
orNone
, defaultNone
- maximum number of orders to fetch, None (default) means unlimited
starting_after
:str
- the order_id after which further responses will be returned, paging forward
ending_before
:str
- the order_id before which further responses will be returned, paging backwards
start_date
:str
- starting date to filter, ISO-8601 YYYY-MM-DD
start_date
:str
- ending date to filter, ISO-8601 YYYY-MM-DD
filter
:str
- filter results that match values contained in the given key separated by a colon. Example: 'metadata.downstream_customer_id:abdc-534-b4dc47'
session
:Session object
, optional- Authenticated session, such as from get_client_session()
Returns
list
- Order objects matching parameters
Expand source code
@classmethod def list_orders( cls, limit=None, starting_after="", ending_before="", start_date="", end_date="", filter=None, session=None, ): """Fetch user's orders Parameters ---------- limit: int or None, default None maximum number of orders to fetch, None (default) means unlimited starting_after: str the order_id after which further responses will be returned, paging forward ending_before: str the order_id before which further responses will be returned, paging backwards start_date: str starting date to filter, ISO-8601 YYYY-MM-DD start_date: str ending date to filter, ISO-8601 YYYY-MM-DD filter: str filter results that match values contained in the given key separated by a colon. Example: 'metadata.downstream_customer_id:abdc-534-b4dc47' session: Session object, optional Authenticated session, such as from get_client_session() Returns ------- list Order objects matching parameters""" session = session or get_user_session() params = { "starting_after": starting_after, "ending_before": ending_before, "start_date": start_date, "end_date": end_date, } if filter is not None: params["filter"] = filter key = lambda order: order["id"] responses = paginated_response(session, ard_url("order"), key, limit, **params) return hydrate_with_responses(Order, OrderResponse, responses, session=session)
def send_order(payload, session=None)
-
Send a request to the Order API
Parameters
payload
:dict
- Order API request payload
session
:Session object
, optional- Authenticated session, such as from get_client_session()
Returns
dict
- API response data for the given Order
Expand source code
@classmethod def send_order(cls, payload, session=None): """Send a request to the Order API Parameters ---------- payload : dict Order API request payload session : Session object, optional Authenticated session, such as from get_client_session() Returns ------- dict API response data for the given Order""" if not session: session = get_user_session() r = session.post(ard_url("order"), json=payload) return r.json()
Instance variables
var failed
-
The Order has finished running but failed
Expand source code
@property @Submitted.required def failed(self): """The Order has finished running but failed""" return self.status in ["FAILED", "ERROR"]
var finished
-
The Order finished processing but may have failed
Expand source code
@property @Submitted.required def finished(self): """The Order finished processing but may have failed""" return self.status != "RUNNING"
var order_id
-
The Order ID
Expand source code
@property @Submitted.required def order_id(self): """The Order ID""" return self.response.id
var running
-
The Order is running
Expand source code
@property @Submitted.required def running(self): """The Order is running""" return self.status == "RUNNING"
var state
-
Legacy version of
status
, will be deprecated in the futureExpand source code
@property def state(self): """Legacy version of `status`, will be deprecated in the future""" return self.status
var status
-
State of the order process: 'RUNNING', 'SUCCEEDED, or 'FAILED'
Expand source code
@property @Submitted.required def status(self): """State of the order process: 'RUNNING', 'SUCCEEDED, or 'FAILED'""" if self.response.status == "RUNNING": response = self.get_order(self.order_id) self.response = OrderResponse(**response) return self.response.status
var submitted
-
Expand source code
@property def submitted(self): if self.response: return self.response.id is not None else: return False
var succeeded
-
The Order has finished running and has succeeded
Expand source code
@property @Submitted.required def succeeded(self): """The Order has finished running and has succeeded""" return self.status == "SUCCEEDED"
var usage
-
Expand source code
@property @Submitted.required def usage(self): if self.response.order is None: # just submitted, refresh the order status response = self.get_order(self.order_id) self.response = OrderResponse(**response) return self.response.order.usage
Methods
def add_email_notification(self, address)
-
Add an email notification to the order
Parameters
address
:str
- Email address to receive order notifications
Expand source code
def add_email_notification(self, address): """Add an email notification to the order Parameters ---------- address : str Email address to receive order notifications""" self.request.notifications.append(EmailNotification(address=address))
def add_sns_notification(self, topic_arn)
-
Add an AWS SNS notification topic to receive order notifications
Parameters
topic_arn
:str
- AWS SNS topic ARN to recieve order notifications
Expand source code
def add_sns_notification(self, topic_arn): """Add an AWS SNS notification topic to receive order notifications Parameters ---------- topic_arn : str AWS SNS topic ARN to recieve order notifications""" self.request.notifications.append(SNSNotification(topic_arn=topic_arn))
def submit(self)
-
Submit this Order to the API
Expand source code
def submit(self): """Submit this Order to the API""" response = self.send_order(self.request.to_payload(), session=self.session) if self.submitted: warnings.warn("The Order has already been submitted") return if self.dry_run: self.response = OrderResponse( id="dry run", status="SUCCEEDED", order={"usage": response["usage"]}, status_message="Order dry-run validation successful.", ) else: self.response = OrderResponse( id=response["id"], status="RUNNING", status_message="Order submitted and running." )
class Select (acq_ids=None, datetime=None, intersects=None, bbox=None, query={}, stack_depth=None, image_age_category=None, session=None)
-
An ARD API Select object
Parameters
acq_ids
:iterable
ofstr
- An iterable of acquisition IDs to search for
datetime
:str
- Date or date range string
intersects
:Geometry-like objects, str (optional))
- Geometry to intersect, can be most geometry or spatial objects, or a path (see Notes)
bbox
:interable
ofnumeric
- Bounding box in WGS84 coordinates, [west, south, east, north]
query
:dict
- Query dictionary
stack_depth
:int
orNone
, optional- Maximum number of tiles to return
image_age_category
:iterable
ofstr
, optional- One or
session
:max_ard.session object
, optional- A user_session or application_session object
Attributes
session
:max_ard.session object
orNone
- A user_session or application_session object
submitted
:bool
- True if Select has been submitted via
Select.submit()
request
:Pydantic model
- Parameters are loaded into a Pydantic model of the HTTP API request
response
:Pydantic model
- Pydantic model of the server response to API call. Status and submit calls return the same payload
Notes
Intersects inputs: Geometry objects: Shapely shapes, objects supporting geo_interface, geojson-like dicts, geojson and wkt strings Geometry iterables: iterables of above, Fiona readers File paths: most spatial file formats. WKT and Geojson supported with base install, other formats require Fiona for reading
Expand source code
class Select(Succeeded, Submitted): """An ARD API Select object Parameters ---------- acq_ids : iterable of str An iterable of acquisition IDs to search for datetime : str Date or date range string intersects : Geometry-like objects, str (optional)) Geometry to intersect, can be most geometry or spatial objects, or a path (see Notes) bbox : interable of numeric Bounding box in WGS84 coordinates, [west, south, east, north] query : dict Query dictionary stack_depth: int or None, optional Maximum number of tiles to return image_age_category : iterable of str, optional One or session : session object, optional A user_session or application_session object Attributes ---------- session : session object or None A user_session or application_session object submitted : bool True if Select has been submitted via `max_ard.select.Select.submit` request : Pydantic model Parameters are loaded into a Pydantic model of the HTTP API request response : Pydantic model Pydantic model of the server response to API call. Status and submit calls return the same payload Notes ----- Intersects inputs: Geometry objects: Shapely shapes, objects supporting __geo_interface__, geojson-like dicts, geojson and wkt strings Geometry iterables: iterables of above, Fiona readers File paths: most spatial file formats. WKT and Geojson supported with base install, other formats require Fiona for reading""" # store an authenticated Requests session in the class def __init__( self, acq_ids=None, datetime=None, intersects=None, bbox=None, query={}, stack_depth=None, image_age_category=None, session=None, ): self.session = session or get_user_session() if intersects is not None: intersects = convert_to_shapely(intersects).wkt self.request = SelectRequest( ids=acq_ids, datetime=datetime, intersects=intersects, bbox=bbox, stack_depth=stack_depth, query=query, image_age_category=image_age_category, ) self.response = None @property def submitted(self): if self.response: return self.response.id is not None else: return False @property @Submitted.required def running(self): """The Select is currently running""" return self.status == "RUNNING" @property @Submitted.required def finished(self): """The Select has finished running but may have failed""" return self.status != "RUNNING" @property @Submitted.required def succeeded(self): """The Select has finished running and has succeeded""" return self.status == "SUCCEEDED" @property @Submitted.required def failed(self): """The Select has finished running but has failed""" return self.status in ["FAILED", "ERROR"] @Submitted.required def wait_for_success(self, interval: int = 5) -> None: """Wait for the Select to succeed Parameters ---------- interval: numeric, optional polling interval for success, default is 5 secs Raises ------ SelectError An error in the selection caused it to fail""" while self.status == "RUNNING": sleep(interval) if self.status == "FAILED": error = self.response.error_message msg = f'{error["Error"]}: {error["Cause"]}' raise SelectError(msg) @property @Submitted.required def status(self): """Status of the select process: 'RUNNING', 'FINISHED', or 'FAILED'""" if self.response.status == "RUNNING": response = self.get_select(self.select_id, self.session) self.response = SelectResponse(**response) return self.response.status @property def state(self): """Alternate name for `max_ard.Select.status`, will be deprecated""" return self.status @property @Submitted.required def select_id(self): """ID of the Select""" return self.response.id @property @Succeeded.required def usage(self): """Dictionary of data usage metrics""" return self.response.usage @classmethod def from_id(cls, select_id: str, session=None): """Create a Select object from an ID Parameters ---------- select_id: str Select ID to hydrate into a Select object session : Session object, optional Authenticated session, such as from get_client_session() Returns ------- Select""" if not session: session = get_user_session() instance = cls() instance.session = session response = cls.get_select(select_id, session) instance.response = SelectResponse(**response) if "request_details" in response: instance.request = SelectRequest(**response["request_details"]) return instance @classmethod def get_select(cls, select_id, session=None): """Fetch raw data about a Select from an ID Parameters ---------- select_id : str Select ID to fetch metadata for session: Session object, optional Authenticated session, such as from get_client_session() Returns ------- dict API data for the given Select""" if not session: session = get_user_session() r = session.get(ard_url("select", "request", select_id)) return r.json() @classmethod def send_select(cls, payload, session=None): """Send a request to the Select API Parameters ---------- payload : dict Select API request payload session : Session object, optional Authenticated session, such as from get_client_session() Returns ------- dict API response data for the given Select""" if not session: session = get_user_session() r = session.post(ard_url("select"), json=payload) return r.json() def submit(self): """Submit this Select to the API""" response = self.send_select(self.request.to_payload(), self.session) if self.submitted: warnings.warn("The Select has already been submitted") return self.response = SelectResponse(**response) # update request in case defaults were applied self.request = SelectRequest(**self.response.request_details) @lru_cache() @Succeeded.required def get_link_contents(self, name): """Get the contents of an Select result file via its signed link Parameters ---------- name : str The Select result file name Returns ------- str Link contents""" temp_url = self.get_signed_link(name) # unauthenticated, don't send token r = requests.get(temp_url) r.raise_for_status() return r.text @Succeeded.required def get_signed_link(self, name): """Get the signed link for a Select result file Parameters ---------- name : str The Select result file name Returns ------- str signed URL""" url = self.response.links[name] r = self.session.get(url) r.raise_for_status() return r.json()["download_link"] @Succeeded.required def copy_file(self, name, dir="."): """Copy a Select result file to a local location Parameters ---------- name : str The Select result file name dir : str, optional Local directory location to copy to, file will retain its name""" # TODO get the output filename path = Path(dir, f"{self.select_id}.{name}") with open(path, "w") as out: file = self.get_link_contents(name) out.write(file) @property @lru_cache() @Succeeded.required def results(self): """The results of a select converted to Python objects""" return SelectResult.from_geojson(json.loads(self.get_link_contents("geojson"))) def __repr__(self) -> str: if not self.submitted: return f"<ARD Select (unsubmitted)>" elif self.succeeded: return f"<ARD Select {self.select_id}>" else: return f"<ARD Select ({self.status})>"
Ancestors
Static methods
def from_id(select_id: str, session=None)
-
Create a Select object from an ID
Parameters
select_id
:str
- Select ID to hydrate into a Select object
session
:Session object
, optional- Authenticated session, such as from get_client_session()
Returns
Expand source code
@classmethod def from_id(cls, select_id: str, session=None): """Create a Select object from an ID Parameters ---------- select_id: str Select ID to hydrate into a Select object session : Session object, optional Authenticated session, such as from get_client_session() Returns ------- Select""" if not session: session = get_user_session() instance = cls() instance.session = session response = cls.get_select(select_id, session) instance.response = SelectResponse(**response) if "request_details" in response: instance.request = SelectRequest(**response["request_details"]) return instance
def get_select(select_id, session=None)
-
Fetch raw data about a Select from an ID
Parameters
select_id
:str
- Select ID to fetch metadata for
session
:Session object
, optional- Authenticated session, such as from get_client_session()
Returns
dict
- API data for the given Select
Expand source code
@classmethod def get_select(cls, select_id, session=None): """Fetch raw data about a Select from an ID Parameters ---------- select_id : str Select ID to fetch metadata for session: Session object, optional Authenticated session, such as from get_client_session() Returns ------- dict API data for the given Select""" if not session: session = get_user_session() r = session.get(ard_url("select", "request", select_id)) return r.json()
def send_select(payload, session=None)
-
Send a request to the Select API
Parameters
payload
:dict
- Select API request payload
session
:Session object
, optional- Authenticated session, such as from get_client_session()
Returns
dict
- API response data for the given Select
Expand source code
@classmethod def send_select(cls, payload, session=None): """Send a request to the Select API Parameters ---------- payload : dict Select API request payload session : Session object, optional Authenticated session, such as from get_client_session() Returns ------- dict API response data for the given Select""" if not session: session = get_user_session() r = session.post(ard_url("select"), json=payload) return r.json()
Instance variables
var failed
-
The Select has finished running but has failed
Expand source code
@property @Submitted.required def failed(self): """The Select has finished running but has failed""" return self.status in ["FAILED", "ERROR"]
var finished
-
The Select has finished running but may have failed
Expand source code
@property @Submitted.required def finished(self): """The Select has finished running but may have failed""" return self.status != "RUNNING"
var results
-
The results of a select converted to Python objects
Expand source code
@property @lru_cache() @Succeeded.required def results(self): """The results of a select converted to Python objects""" return SelectResult.from_geojson(json.loads(self.get_link_contents("geojson")))
var running
-
The Select is currently running
Expand source code
@property @Submitted.required def running(self): """The Select is currently running""" return self.status == "RUNNING"
var select_id
-
ID of the Select
Expand source code
@property @Submitted.required def select_id(self): """ID of the Select""" return self.response.id
var state
-
Alternate name for
Select.status
, will be deprecatedExpand source code
@property def state(self): """Alternate name for `max_ard.Select.status`, will be deprecated""" return self.status
var status
-
Status of the select process: 'RUNNING', 'FINISHED', or 'FAILED'
Expand source code
@property @Submitted.required def status(self): """Status of the select process: 'RUNNING', 'FINISHED', or 'FAILED'""" if self.response.status == "RUNNING": response = self.get_select(self.select_id, self.session) self.response = SelectResponse(**response) return self.response.status
var submitted
-
Expand source code
@property def submitted(self): if self.response: return self.response.id is not None else: return False
var succeeded
-
The Select has finished running and has succeeded
Expand source code
@property @Submitted.required def succeeded(self): """The Select has finished running and has succeeded""" return self.status == "SUCCEEDED"
var usage
-
Dictionary of data usage metrics
Expand source code
@property @Succeeded.required def usage(self): """Dictionary of data usage metrics""" return self.response.usage
Methods
def copy_file(self, name, dir='.')
-
Copy a Select result file to a local location
Parameters
name
:str
- The Select result file name
dir
:str
, optional- Local directory location to copy to, file will retain its name
Expand source code
@Succeeded.required def copy_file(self, name, dir="."): """Copy a Select result file to a local location Parameters ---------- name : str The Select result file name dir : str, optional Local directory location to copy to, file will retain its name""" # TODO get the output filename path = Path(dir, f"{self.select_id}.{name}") with open(path, "w") as out: file = self.get_link_contents(name) out.write(file)
def get_link_contents(self, name)
-
Get the contents of an Select result file via its signed link
Parameters
name
:str
- The Select result file name
Returns
str
- Link contents
Expand source code
@lru_cache() @Succeeded.required def get_link_contents(self, name): """Get the contents of an Select result file via its signed link Parameters ---------- name : str The Select result file name Returns ------- str Link contents""" temp_url = self.get_signed_link(name) # unauthenticated, don't send token r = requests.get(temp_url) r.raise_for_status() return r.text
def get_signed_link(self, name)
-
Get the signed link for a Select result file
Parameters
name
:str
- The Select result file name
Returns
str
- signed URL
Expand source code
@Succeeded.required def get_signed_link(self, name): """Get the signed link for a Select result file Parameters ---------- name : str The Select result file name Returns ------- str signed URL""" url = self.response.links[name] r = self.session.get(url) r.raise_for_status() return r.json()["download_link"]
def submit(self)
-
Submit this Select to the API
Expand source code
def submit(self): """Submit this Select to the API""" response = self.send_select(self.request.to_payload(), self.session) if self.submitted: warnings.warn("The Select has already been submitted") return self.response = SelectResponse(**response) # update request in case defaults were applied self.request = SelectRequest(**self.response.request_details)
def wait_for_success(self, interval: int = 5) ‑> None
-
Wait for the Select to succeed
Parameters
interval
:numeric
, optional- polling interval for success, default is 5 secs
Raises
SelectError
- An error in the selection caused it to fail
Expand source code
@Submitted.required def wait_for_success(self, interval: int = 5) -> None: """Wait for the Select to succeed Parameters ---------- interval: numeric, optional polling interval for success, default is 5 secs Raises ------ SelectError An error in the selection caused it to fail""" while self.status == "RUNNING": sleep(interval) if self.status == "FAILED": error = self.response.error_message msg = f'{error["Error"]}: {error["Cause"]}' raise SelectError(msg)
class SelectResult
-
The results of a Select or MetaSelect operation.
This object is converted from the GeoJSON FeatureCollections returned by Selects and MetaSelects into Python objects
Expand source code
class SelectResult(BaseCollection): """The results of a Select or MetaSelect operation. This object is converted from the GeoJSON FeatureCollections returned by Selects and MetaSelects into Python objects""" def __init__(self): super().__init__() @classmethod def from_geojson(cls, geojson): self = cls() for feature in geojson["features"]: if "best_matches" in feature["properties"]: for match in feature["properties"]["best_matches"]: self.add_tile(SelectTile(match)) else: try: self.add_tile(SelectTile(feature["properties"])) except (KeyError, TypeError): pass return self def __repr__(self) -> str: return ( f"<SelectResult ({len(self.tiles)} tiles in {len(self.acquisitions)} acquisitions) >" )
Ancestors
Static methods
def from_geojson(geojson)
-
Expand source code
@classmethod def from_geojson(cls, geojson): self = cls() for feature in geojson["features"]: if "best_matches" in feature["properties"]: for match in feature["properties"]["best_matches"]: self.add_tile(SelectTile(match)) else: try: self.add_tile(SelectTile(feature["properties"])) except (KeyError, TypeError): pass return self
Inherited members