Source code for globus_sdk.services.flows.client

from __future__ import annotations

import logging
import typing as t

from globus_sdk import GlobusHTTPResponse, client, paging, scopes, utils
from globus_sdk._types import UUIDLike
from globus_sdk.authorizers import GlobusAuthorizer
from globus_sdk.scopes import ScopeBuilder
from globus_sdk.utils import MISSING, MissingType

from .errors import FlowsAPIError
from .response import (
    IterableFlowsResponse,
    IterableRunLogsResponse,
    IterableRunsResponse,
)
from .scopes import SpecificFlowScopesClassStub

log = logging.getLogger(__name__)


[docs] class FlowsClient(client.BaseClient): r""" Client for the Globus Flows API. .. automethodlist:: globus_sdk.FlowsClient """ error_class = FlowsAPIError service_name = "flows" scopes = scopes.FlowsScopes
[docs] def create_flow( self, title: str, definition: dict[str, t.Any], input_schema: dict[str, t.Any], subtitle: str | None = None, description: str | None = None, flow_viewers: list[str] | None = None, flow_starters: list[str] | None = None, flow_administrators: list[str] | None = None, keywords: list[str] | None = None, subscription_id: UUIDLike | None = None, additional_fields: dict[str, t.Any] | None = None, ) -> GlobusHTTPResponse: """ Create a Flow :param title: A non-unique, human-friendly name used for displaying the flow to end users. (1 - 128 characters) :param definition: JSON object specifying flows states and execution order. For a more detailed explanation of the flow definition, see `Authoring Flows <https://docs.globus.org/api/flows/authoring-flows>`_ :param input_schema: A JSON Schema to which Flow Invocation input must conform :param subtitle: A concise summary of the flow’s purpose. (0 - 128 characters) :param description: A detailed description of the flow's purpose for end user display. (0 - 4096 characters) :param flow_viewers: A set of Principal URN values, or the value "public", indicating entities who can view the flow .. dropdown:: Example Values .. code-block:: json [ "public" ] .. code-block:: json [ "urn:globus:auth:identity:b44bddda-d274-11e5-978a-9f15789a8150", "urn:globus:groups:id:c1dcd951-3f35-4ea3-9f28-a7cdeaf8b68f" ] :param flow_starters: A set of Principal URN values, or the value "all_authenticated_users", indicating entities who can initiate a *Run* of the flow .. dropdown:: Example Values .. code-block:: json [ "all_authenticated_users" ] .. code-block:: json [ "urn:globus:auth:identity:b44bddda-d274-11e5-978a-9f15789a8150", "urn:globus:groups:id:c1dcd951-3f35-4ea3-9f28-a7cdeaf8b68f" ] :param flow_administrators: A set of Principal URN values indicating entities who can perform administrative operations on the flow (create, delete, update) .. dropdown:: Example Values .. code-block:: json [ "urn:globus:auth:identity:b44bddda-d274-11e5-978a-9f15789a8150", "urn:globus:groups:id:c1dcd951-3f35-4ea3-9f28-a7cdeaf8b68f" ] :param keywords: A set of terms used to categorize the flow used in query and discovery operations (0 - 1024 items) :param subscription_id: The ID of the subscription to associate with the flow, marking as a subscription tier flow. :param additional_fields: Additional Key/Value pairs sent to the create API .. tab-set:: .. tab-item:: Example Usage .. code-block:: python from globus_sdk import FlowsClient ... flows = FlowsClient(...) flows.create_flow( title="my-cool-flow", definition={ "StartAt": "the-one-true-state", "States": {"the-one-true-state": {"Type": "Pass", "End": True}}, }, input_schema={ "type": "object", "properties": { "input-a": {"type": "string"}, "input-b": {"type": "number"}, "input-c": {"type": "boolean"}, }, }, ) .. tab-item:: Example Response Data .. expandtestfixture:: flows.create_flow .. tab-item:: API Info .. extdoclink:: Create Flow :service: flows :ref: Flows/paths/~1flows/post """ # noqa E501 data = { k: v for k, v in { "title": title, "definition": definition, "input_schema": input_schema, "subtitle": subtitle, "description": description, "flow_viewers": flow_viewers, "flow_starters": flow_starters, "flow_administrators": flow_administrators, "keywords": keywords, "subscription_id": subscription_id, }.items() if v is not None } data.update(additional_fields or {}) return self.post("/flows", data=data)
[docs] def get_flow( self, flow_id: UUIDLike, *, query_params: dict[str, t.Any] | None = None, ) -> GlobusHTTPResponse: """Retrieve a Flow by ID :param flow_id: The ID of the Flow to fetch :param query_params: Any additional parameters to be passed through as query params. .. tab-set:: .. tab-item:: API Info .. extdoclink:: Get Flow :service: flows :ref: Flows/paths/~1flows~1{flow_id}/get """ if query_params is None: query_params = {} return self.get(f"/flows/{flow_id}", query_params=query_params)
[docs] @paging.has_paginator(paging.MarkerPaginator, items_key="flows") def list_flows( self, *, filter_role: str | None = None, filter_fulltext: str | None = None, orderby: str | t.Iterable[str] | None = None, marker: str | None = None, query_params: dict[str, t.Any] | None = None, ) -> IterableFlowsResponse: """ List deployed Flows :param filter_role: A role name specifying the minimum permissions required for a Flow to be included in the response. :param filter_fulltext: A string to use in a full-text search to filter results :param orderby: A criterion for ordering flows in the listing :param marker: A marker for pagination :param query_params: Any additional parameters to be passed through as query params. **Role Values** The valid values for ``role`` are, in order of precedence for ``filter_role``: - ``flow_viewer`` - ``flow_starter`` - ``flow_administrator`` - ``flow_owner`` For example, if ``flow_starter`` is specified then flows for which the user has the ``flow_starter``, ``flow_administrator`` or ``flow_owner`` roles will be returned. **OrderBy Values** Values for ``orderby`` consist of a field name, a space, and an ordering mode -- ``ASC`` for "ascending" and ``DESC`` for "descending". Supported field names are - ``id`` - ``scope_string`` - ``flow_owners`` - ``flow_administrators`` - ``title`` - ``created_at`` - ``updated_at`` For example, ``orderby="updated_at DESC"`` requests a descending sort on update times, getting the most recently updated flow first. Multiple ``orderby`` values may be given as an iterable, e.g. ``orderby=["updated_at DESC", "title ASC"]``. .. tab-set:: .. tab-item:: Example Usage .. code-block:: python import json import textwrap from globus_sdk import FlowsClient flows = FlowsClient(...) my_frobulate_flows = flows.list_flows( filter_role="flow_owner", filter_fulltext="frobulate", orderby=("title ASC", "updated_at DESC"), ) for flow_doc in my_frobulate_flows: print(f"Title: {flow_doc['title']}") print(f"Description: {flow_doc['description']}") print("Definition:") print( textwrap.indent( json.dumps( flow_doc["definition"], indent=2, separators=(",", ": "), ), " ", ) ) print() .. tab-item:: Paginated Usage .. paginatedusage:: list_flows .. tab-item:: API Info .. extdoclink:: List Flows :service: flows :ref: Flows/paths/~1flows/get """ if query_params is None: query_params = {} if filter_role is not None: query_params["filter_role"] = filter_role if filter_fulltext is not None: query_params["filter_fulltext"] = filter_fulltext if orderby is not None: if isinstance(orderby, str): query_params["orderby"] = orderby else: # copy any input sequence to force the type to `list` which is known to # behave well # this also ensures that we will consume non-sequence iterables # (e.g. generator expressions) in a well-defined way query_params["orderby"] = list(orderby) if marker is not None: query_params["marker"] = marker return IterableFlowsResponse(self.get("/flows", query_params=query_params))
[docs] def update_flow( self, flow_id: UUIDLike, *, title: str | None = None, definition: dict[str, t.Any] | None = None, input_schema: dict[str, t.Any] | None = None, subtitle: str | None = None, description: str | None = None, flow_owner: str | None = None, flow_viewers: list[str] | None = None, flow_starters: list[str] | None = None, flow_administrators: list[str] | None = None, keywords: list[str] | None = None, subscription_id: UUIDLike | t.Literal["DEFAULT"] | MissingType = MISSING, additional_fields: dict[str, t.Any] | None = None, ) -> GlobusHTTPResponse: """ Update a Flow Only the parameter `flow_id` is required. Any fields omitted from the request will be unchanged :param flow_id: The ID of the Flow to fetch :param title: A non-unique, human-friendly name used for displaying the flow to end users. (1 - 128 characters) :param definition: JSON object specifying flows states and execution order. For a more detailed explanation of the flow definition, see `Authoring Flows <https://docs.globus.org/api/flows/authoring-flows>`_ :param input_schema: A JSON Schema to which Flow Invocation input must conform :param subtitle: A concise summary of the flow’s purpose. (0 - 128 characters) :param description: A detailed description of the flow's purpose for end user display. (0 - 4096 characters) :param flow_owner: An Auth Identity URN to set as flow owner; this must match the Identity URN of the entity calling `update_flow` :param flow_viewers: A set of Principal URN values, or the value "public", indicating entities who can view the flow .. dropdown:: Example Values .. code-block:: json [ "public" ] .. code-block:: json [ "urn:globus:auth:identity:b44bddda-d274-11e5-978a-9f15789a8150", "urn:globus:groups:id:c1dcd951-3f35-4ea3-9f28-a7cdeaf8b68f" ] :param flow_starters: A set of Principal URN values, or the value "all_authenticated_users", indicating entities who can initiate a *Run* of the flow .. dropdown:: Example Values .. code-block:: json [ "all_authenticated_users" ] .. code-block:: json [ "urn:globus:auth:identity:b44bddda-d274-11e5-978a-9f15789a8150", "urn:globus:groups:id:c1dcd951-3f35-4ea3-9f28-a7cdeaf8b68f" ] :param flow_administrators: A set of Principal URN values indicating entities who can perform administrative operations on the flow (create, delete, update) .. dropdown:: Example Value .. code-block:: json [ "urn:globus:auth:identity:b44bddda-d274-11e5-978a-9f15789a8150", "urn:globus:groups:id:c1dcd951-3f35-4ea3-9f28-a7cdeaf8b68f" ] :param keywords: A set of terms used to categorize the flow used in query and discovery operations (0 - 1024 items) :param subscription_id: A subscription ID to assign to the flow. :param additional_fields: Additional Key/Value pairs sent to the create API .. tab-set:: .. tab-item:: Example Usage .. code-block:: python from globus_sdk import FlowsClient flows = FlowsClient(...) flows.update_flow( flow_id="581753c7-45da-43d3-ad73-246b46e7cb6b", keywords=["new", "overriding", "keywords"], ) .. tab-item:: Example Response Data .. expandtestfixture:: flows.update_flow .. tab-item:: API Info .. extdoclink:: Update Flow :service: flows :ref: Flows/paths/~1flows~1{flow_id}/put """ # noqa E501 data = { k: v for k, v in { "title": title, "definition": definition, "input_schema": input_schema, "subtitle": subtitle, "description": description, "flow_owner": flow_owner, "flow_viewers": flow_viewers, "flow_starters": flow_starters, "flow_administrators": flow_administrators, "keywords": keywords, "subscription_id": subscription_id, }.items() if v is not None and v is not MISSING } data.update(additional_fields or {}) return self.put(f"/flows/{flow_id}", data=data)
[docs] def delete_flow( self, flow_id: UUIDLike, *, query_params: dict[str, t.Any] | None = None, ) -> GlobusHTTPResponse: """Delete a Flow :param flow_id: The ID of the flow to delete :param query_params: Any additional parameters to be passed through as query params. .. tab-set:: .. tab-item:: API Info .. extdoclink:: Delete Flow :service: flows :ref: Flows/paths/~1flows~1{flow_id}/delete """ if query_params is None: query_params = {} return self.delete(f"/flows/{flow_id}", query_params=query_params)
[docs] @paging.has_paginator(paging.MarkerPaginator, items_key="runs") def list_runs( self, *, filter_flow_id: t.Iterable[UUIDLike] | UUIDLike | None = None, marker: str | None = None, query_params: dict[str, t.Any] | None = None, ) -> IterableRunsResponse: """ List all runs. :param filter_flow_id: One or more Flow IDs used to filter the results :param marker: A pagination marker, used to get the next page of results. :param query_params: Any additional parameters to be passed through .. tab-set:: .. tab-item:: Example Usage .. code-block:: python flows = globus_sdk.FlowsClient(...) for run in flows.list_runs(): print(run["run_id"]) .. tab-item:: Example Response Data .. expandtestfixture:: flows.list_runs .. tab-item:: API Info .. extdoclink:: List Runs :service: flows :ref: Runs/paths/~1runs/get """ if query_params is None: query_params = {} if filter_flow_id is not None: query_params["filter_flow_id"] = ",".join( utils.safe_strseq_iter(filter_flow_id) ) if marker is not None: query_params["marker"] = marker return IterableRunsResponse(self.get("/runs", query_params=query_params))
[docs] @paging.has_paginator(paging.MarkerPaginator, items_key="entries") def get_run_logs( self, run_id: UUIDLike, *, limit: int | None = None, reverse_order: bool | None = None, marker: str | None = None, query_params: dict[str, t.Any] | None = None, ) -> IterableRunLogsResponse: """ Retrieve the execution logs associated with a run These logs describe state transitions and associated payloads for a run :param run_id: Run ID to retrieve logs for :param limit: Maximum number of log entries to return (server default: 10) (value between 1 and 100 inclusive) :param reverse_order: Return results in reverse chronological order (server default: false) :param marker: Marker for the next page of results (provided by the server) :param query_params: Any additional parameters to be passed through .. tab-set:: .. tab-item:: Paginated Usage .. paginatedusage:: get_run_logs .. tab-item:: Example Response Data .. expandtestfixture:: flows.get_run_logs .. tab-item:: API Info .. extdoclink:: Get Run Logs :service: flows :ref: Runs/paths/~1runs~1{action_id}~1log/get """ query_params = { "limit": limit, "reverse_order": reverse_order, "marker": marker, **(query_params or {}), } # Filter out request keys with None values to allow server defaults query_params = {k: v for k, v in query_params.items() if v is not None} return IterableRunLogsResponse( self.get(f"/runs/{run_id}/log", query_params=query_params) )
[docs] def get_run( self, run_id: UUIDLike, *, include_flow_description: bool | None = None, query_params: dict[str, t.Any] | None = None, ) -> GlobusHTTPResponse: """ Retrieve information about a particular Run of a Flow :param run_id: The ID of the run to get :param include_flow_description: If set to true, the lookup will attempt to attach metadata about the flow to the run to the run response under the key "flow_description" (default: False) :param query_params: Any additional parameters to be passed through .. tab-set:: .. tab-item:: Example Usage .. code-block:: python from globus_sdk import FlowsClient flows = FlowsClient(...) flows.get_run("581753c7-45da-43d3-ad73-246b46e7cb6b") .. tab-item:: Example Response Data .. expandtestfixture:: flows.get_run .. tab-item:: API Info .. extdoclink:: Get Run :service: flows :ref: Flows/paths/~1runs~1{run_id}/get """ query_params = query_params or {} if include_flow_description is not None: query_params["include_flow_description"] = include_flow_description return self.get(f"/runs/{run_id}", query_params=query_params)
[docs] def get_run_definition( self, run_id: UUIDLike, ) -> GlobusHTTPResponse: """ Get the flow definition and input schema at the time the run was started. :param run_id: The ID of the run to get .. tab-set:: .. tab-item:: Example Usage .. code-block:: python from globus_sdk import FlowsClient flows = FlowsClient(...) flows.get_run_definition("581753c7-45da-43d3-ad73-246b46e7cb6b") .. tab-item:: Example Response Data .. expandtestfixture:: flows.get_run_definition .. tab-item:: API Info .. extdoclink:: Get Run Definition :service: flows :ref: Flows/paths/~1runs~1{run_id}~1definition/get """ return self.get(f"/runs/{run_id}/definition")
[docs] def cancel_run(self, run_id: UUIDLike) -> GlobusHTTPResponse: """ Cancel a run. :param run_id: The ID of the run to cancel .. tab-set:: .. tab-item:: Example Usage .. code-block:: python from globus_sdk import FlowsClient flows = FlowsClient(...) flows.cancel_run("581753c7-45da-43d3-ad73-246b46e7cb6b") .. tab-item:: Example Response Data .. expandtestfixture:: flows.cancel_run .. tab-item:: API Info .. extdoclink:: Cancel Run :service: flows :ref: Runs/paths/~1runs~1{run_id}~1cancel/post """ return self.post(f"/runs/{run_id}/cancel")
[docs] def update_run( self, run_id: UUIDLike, *, label: str | None = None, tags: list[str] | None = None, run_monitors: list[str] | None = None, run_managers: list[str] | None = None, additional_fields: dict[str, t.Any] | None = None, ) -> GlobusHTTPResponse: """ Update the metadata of a specific run. :param run_id: The ID of the run to update :param label: A short human-readable title (1 - 64 chars) :param tags: A collection of searchable tags associated with the run. Tags are normalized by stripping leading and trailing whitespace, and replacing all whitespace with a single space. :param run_monitors: A list of authenticated entities (identified by URN) authorized to view this run in addition to the run owner :param run_managers: A list of authenticated entities (identified by URN) authorized to view & cancel this run in addition to the run owner :param additional_fields: Additional Key/Value pairs sent to the run API (this parameter is used to bypass local sdk key validation helping) .. tab-set:: .. tab-item:: Example Usage .. code-block:: python from globus_sdk import FlowsClient flows = FlowsClient(...) flows.update_run( "581753c7-45da-43d3-ad73-246b46e7cb6b", label="Crunch numbers for experiment xDA202-batch-10", ) .. tab-item:: Example Response Data .. expandtestfixture:: flows.update_run .. tab-item:: API Info .. extdoclink:: Update Run :service: flows :ref: Runs/paths/~1runs~1{run_id}/put """ data = { k: v for k, v in { "tags": tags, "label": label, "run_monitors": run_monitors, "run_managers": run_managers, }.items() if v is not None } data.update(additional_fields or {}) return self.put(f"/runs/{run_id}", data=data)
[docs] def delete_run(self, run_id: UUIDLike) -> GlobusHTTPResponse: """ Delete a run. :param run_id: The ID of the run to delete .. tab-set:: .. tab-item:: Example Usage .. code-block:: python from globus_sdk import FlowsClient flows = FlowsClient(...) flows.delete_run("581753c7-45da-43d3-ad73-246b46e7cb6b") .. tab-item:: Example Response Data .. expandtestfixture:: flows.delete_run .. tab-item:: API Info .. extdoclink:: Delete Run :service: flows :ref: Runs/paths/~1runs~1{run_id}~1release/post """ return self.post(f"/runs/{run_id}/release")
[docs] class SpecificFlowClient(client.BaseClient): r""" Client for interacting with a specific Globus Flow through the Flows API. Unlike other client types, this must be provided with a specific flow id. All other arguments are the same as those for :class:`~globus_sdk.BaseClient`. :param flow_id: The generated UUID associated with a flow .. automethodlist:: globus_sdk.SpecificFlowClient """ error_class = FlowsAPIError service_name = "flows" scopes: ScopeBuilder = SpecificFlowScopesClassStub() def __init__( self, flow_id: UUIDLike, *, environment: str | None = None, authorizer: GlobusAuthorizer | None = None, app_name: str | None = None, transport_params: dict[str, t.Any] | None = None, ): super().__init__( environment=environment, authorizer=authorizer, app_name=app_name, transport_params=transport_params, ) self._flow_id = flow_id user_scope_value = f"flow_{str(flow_id).replace('-', '_')}_user" self.scopes = ScopeBuilder( resource_server=str(self._flow_id), known_url_scopes=[("user", user_scope_value)], )
[docs] def run_flow( self, body: dict[str, t.Any], *, label: str | None = None, tags: list[str] | None = None, run_monitors: list[str] | None = None, run_managers: list[str] | None = None, additional_fields: dict[str, t.Any] | None = None, ) -> GlobusHTTPResponse: """ :param body: The input json object handed to the first flow state. The flows service will validate this object against the flow's supplied input schema. :param label: A short human-readable title (1 - 64 chars) :param tags: A collection of searchable tags associated with the run. Tags are normalized by stripping leading and trailing whitespace, and replacing all whitespace with a single space. :param run_monitors: A list of authenticated entities (identified by URN) authorized to view this run in addition to the run owner :param run_managers: A list of authenticated entities (identified by URN) authorized to view & cancel this run in addition to the run owner :param additional_fields: Additional Key/Value pairs sent to the run API (this parameter is used to bypass local sdk key validation helping) .. tab-set:: .. tab-item:: API Info .. extdoclink:: Run Flow :service: flows :ref: ~1flows~1{flow_id}~1run/post """ data = { k: v for k, v in { "body": body, "tags": tags, "label": label, "run_monitors": run_monitors, "run_managers": run_managers, }.items() if v is not None } data.update(additional_fields or {}) return self.post(f"/flows/{self._flow_id}/run", data=data)
[docs] def resume_run(self, run_id: UUIDLike) -> GlobusHTTPResponse: """ :param run_id: The ID of the run to resume .. tab-set:: .. tab-item:: Example Usage .. code-block:: python from globus_sdk import SpecificFlowClient ... flow = SpecificFlowClient(flow_id, ...) flow.resume_run(run_id) .. tab-item:: Example Response Data .. expandtestfixture:: flows.resume_run .. tab-item:: API Info .. extdoclink:: Resume Run :service: flows :ref: Runs/paths/~1flows~1{flow_id}~1runs~1{run_id}~1resume/post """ return self.post(f"/runs/{run_id}/resume")