Create & Run a Flow#

These examples guide you through creating and running a flow using the Globus Flows service.

Note that users are restricted to only creating one flow unless covered by a Globus subscription. Therefore, these examples will also include an option to delete your flow.

Create and Delete Hello World Flow#

This script provides commands to create, list, and delete your flows. The flow definition used for it is simple and baked into the script, but this demonstrates the minimal flow creation and deletion process.

manage_flow_minimal.py [download]#
#!/usr/bin/env python

import argparse
import os
import sys

import globus_sdk
from globus_sdk.tokenstorage import SimpleJSONFileAdapter

MY_FILE_ADAPTER = SimpleJSONFileAdapter(os.path.expanduser("~/.sdk-manage-flow.json"))

SCOPES = [globus_sdk.FlowsClient.scopes.manage_flows]
RESOURCE_SERVER = globus_sdk.FlowsClient.resource_server

# tutorial client ID
# we recommend replacing this with your own client for any production use-cases
CLIENT_ID = "61338d24-54d5-408f-a10d-66c06b59f6d2"

NATIVE_CLIENT = globus_sdk.NativeAppAuthClient(CLIENT_ID)


def do_login_flow():
    NATIVE_CLIENT.oauth2_start_flow(requested_scopes=SCOPES, refresh_tokens=True)
    authorize_url = NATIVE_CLIENT.oauth2_get_authorize_url()
    print(f"Please go to this URL and login:\n\n{authorize_url}\n")
    auth_code = input("Please enter the code here: ").strip()
    tokens = NATIVE_CLIENT.oauth2_exchange_code_for_tokens(auth_code)
    return tokens


def get_authorizer():
    # try to load the tokens from the file, possibly returning None
    if MY_FILE_ADAPTER.file_exists():
        tokens = MY_FILE_ADAPTER.get_token_data(RESOURCE_SERVER)
    else:
        tokens = None

    if tokens is None:
        # do a login flow, getting back initial tokens
        response = do_login_flow()
        # now store the tokens and pull out the correct token
        MY_FILE_ADAPTER.store(response)
        tokens = response.by_resource_server[RESOURCE_SERVER]

    return globus_sdk.RefreshTokenAuthorizer(
        tokens["refresh_token"],
        NATIVE_CLIENT,
        access_token=tokens["access_token"],
        expires_at=tokens["expires_at_seconds"],
        on_refresh=MY_FILE_ADAPTER.on_refresh,
    )


def get_flows_client():
    return globus_sdk.FlowsClient(authorizer=get_authorizer())


def create_flow(args):
    flows_client = get_flows_client()
    print(
        flows_client.create_flow(
            title=args.title,
            definition={
                "StartAt": "DoIt",
                "States": {
                    "DoIt": {
                        "Type": "Action",
                        "ActionUrl": "https://actions.globus.org/hello_world",
                        "Parameters": {
                            "echo_string": "Hello, Asynchronous World!",
                        },
                        "End": True,
                    }
                },
            },
            input_schema={},
            subtitle="A flow created by the SDK tutorial",
        )
    )


def delete_flow(args):
    flows_client = get_flows_client()
    print(flows_client.delete_flow(args.flow_id))


def list_flows():
    flows_client = get_flows_client()
    for flow in flows_client.list_flows(filter_role="flow_owner"):
        print(f"title: {flow['title']}")
        print(f"id: {flow['id']}")
        print()


def main():
    parser = argparse.ArgumentParser()
    parser.add_argument("action", choices=["create", "delete", "list"])
    parser.add_argument("-f", "--flow-id", help="Flow ID for delete")
    parser.add_argument("-t", "--title", help="Name for create")
    args = parser.parse_args()

    try:
        if args.action == "create":
            if args.title is None:
                parser.error("create requires --title")
            create_flow(args)
        elif args.action == "delete":
            if args.flow_id is None:
                parser.error("delete requires --flow-id")
            delete_flow(args)
        elif args.action == "list":
            list_flows()
        else:
            raise NotImplementedError()
    except globus_sdk.FlowsAPIError as e:
        print(f"API Error: {e.code} {e.message}")
        print(e.text)
        sys.exit(1)


if __name__ == "__main__":
    main()

Run a Flow#

This next example is distinct. It runs a flow but has no capability to create or delete a flow. Note how SpecificFlowClient is used – this class allows users to access the flow-specific scope and provides the methods associated with that scope of running flows and resuming runs.

The login code is slightly different from the previous example, as it has to key off of the Flow ID in order to act appropriately.

run_flow_minimal.py [download]#
#!/usr/bin/env python

import argparse
import os
import sys

import globus_sdk
from globus_sdk.tokenstorage import SimpleJSONFileAdapter

MY_FILE_ADAPTER = SimpleJSONFileAdapter(os.path.expanduser("~/.sdk-manage-flow.json"))

# tutorial client ID
# we recommend replacing this with your own client for any production use-cases
CLIENT_ID = "61338d24-54d5-408f-a10d-66c06b59f6d2"

NATIVE_CLIENT = globus_sdk.NativeAppAuthClient(CLIENT_ID)


def do_login_flow(scope):
    NATIVE_CLIENT.oauth2_start_flow(requested_scopes=scope, refresh_tokens=True)
    authorize_url = NATIVE_CLIENT.oauth2_get_authorize_url()
    print(f"Please go to this URL and login:\n\n{authorize_url}\n")
    auth_code = input("Please enter the code here: ").strip()
    tokens = NATIVE_CLIENT.oauth2_exchange_code_for_tokens(auth_code)
    return tokens


def get_authorizer(flow_id):
    scopes = globus_sdk.SpecificFlowClient(flow_id).scopes

    # try to load the tokens from the file, possibly returning None
    if MY_FILE_ADAPTER.file_exists():
        tokens = MY_FILE_ADAPTER.get_token_data(flow_id)
    else:
        tokens = None

    if tokens is None:
        # do a login flow, getting back initial tokens
        response = do_login_flow(scopes.user)
        # now store the tokens and pull out the correct token
        MY_FILE_ADAPTER.store(response)
        tokens = response.by_resource_server[flow_id]

    return globus_sdk.RefreshTokenAuthorizer(
        tokens["refresh_token"],
        NATIVE_CLIENT,
        access_token=tokens["access_token"],
        expires_at=tokens["expires_at_seconds"],
        on_refresh=MY_FILE_ADAPTER.on_refresh,
    )


def get_flow_client(flow_id):
    authorizer = get_authorizer(flow_id)
    return globus_sdk.SpecificFlowClient(flow_id, authorizer=authorizer)


def run_flow(args):
    flow_client = get_flow_client(args.FLOW_ID)
    print(flow_client.run_flow({}))


def main():
    parser = argparse.ArgumentParser()
    parser.add_argument("FLOW_ID", help="Flow ID to run")
    args = parser.parse_args()

    try:
        run_flow(args)
    except globus_sdk.FlowsAPIError as e:
        print(f"API Error: {e.code} {e.message}")
        print(e.text)
        sys.exit(1)


if __name__ == "__main__":
    main()

Create, Delete, and Run Flows#

The following example combines the previous two. It has to further enhance the login code to account for the two different styles of login which it supports, but minimal other adjustments are needed.

Depending on the operation chosen, either the FlowsClient or the SpecificFlowClient will be used, and the login flow will also be appropriately parametrized.

manage_flow.py [download]#
#!/usr/bin/env python
import argparse
import os
import sys

import globus_sdk
from globus_sdk.tokenstorage import SimpleJSONFileAdapter

MY_FILE_ADAPTER = SimpleJSONFileAdapter(os.path.expanduser("~/.sdk-manage-flow.json"))

# tutorial client ID
# we recommend replacing this with your own client for any production use-cases
CLIENT_ID = "61338d24-54d5-408f-a10d-66c06b59f6d2"

NATIVE_CLIENT = globus_sdk.NativeAppAuthClient(CLIENT_ID)


def do_login_flow(scope):
    NATIVE_CLIENT.oauth2_start_flow(requested_scopes=scope, refresh_tokens=True)
    authorize_url = NATIVE_CLIENT.oauth2_get_authorize_url()
    print(f"Please go to this URL and login:\n\n{authorize_url}\n")
    auth_code = input("Please enter the code here: ").strip()
    tokens = NATIVE_CLIENT.oauth2_exchange_code_for_tokens(auth_code)
    return tokens


def get_authorizer(flow_id=None):
    if flow_id:
        resource_server = flow_id
        scope = globus_sdk.SpecificFlowClient(flow_id).scopes.user
    else:
        resource_server = globus_sdk.FlowsClient.resource_server
        scope = globus_sdk.FlowsClient.scopes.manage_flows

    # try to load the tokens from the file, possibly returning None
    if MY_FILE_ADAPTER.file_exists():
        tokens = MY_FILE_ADAPTER.get_token_data(resource_server)
    else:
        tokens = None

    if tokens is None:
        # do a login flow, getting back initial tokens
        response = do_login_flow(scope)
        # now store the tokens and pull out the correct token
        MY_FILE_ADAPTER.store(response)
        tokens = response.by_resource_server[resource_server]

    return globus_sdk.RefreshTokenAuthorizer(
        tokens["refresh_token"],
        NATIVE_CLIENT,
        access_token=tokens["access_token"],
        expires_at=tokens["expires_at_seconds"],
        on_refresh=MY_FILE_ADAPTER.on_refresh,
    )


def get_flows_client():
    return globus_sdk.FlowsClient(authorizer=get_authorizer())


def get_specific_flow_client(flow_id):
    authorizer = get_authorizer(flow_id)
    return globus_sdk.SpecificFlowClient(flow_id, authorizer=authorizer)


def create_flow(args):
    flows_client = get_flows_client()
    print(
        flows_client.create_flow(
            title=args.title,
            definition={
                "StartAt": "DoIt",
                "States": {
                    "DoIt": {
                        "Type": "Action",
                        "ActionUrl": "https://actions.globus.org/hello_world",
                        "Parameters": {
                            "echo_string": "Hello, Asynchronous World!",
                        },
                        "End": True,
                    }
                },
            },
            input_schema={},
            subtitle="A flow created by the SDK tutorial",
        )
    )


def delete_flow(args):
    flows_client = get_flows_client()
    print(flows_client.delete_flow(args.flow_id))


def list_flows():
    flows_client = get_flows_client()
    for flow in flows_client.list_flows(filter_role="flow_owner"):
        print(f"title: {flow['title']}")
        print(f"id: {flow['id']}")
        print()


def run_flow(args):
    flow_client = get_specific_flow_client(args.flow_id)
    print(flow_client.run_flow({}))


def logout():
    for tokendata in MY_FILE_ADAPTER.get_by_resource_server().values():
        for tok_key in ("access_token", "refresh_token"):
            token = tokendata[tok_key]
            NATIVE_CLIENT.oauth2_revoke_token(token)

    os.remove(MY_FILE_ADAPTER.filename)


def main():
    parser = argparse.ArgumentParser()
    parser.add_argument("action", choices=["logout", "create", "delete", "list", "run"])
    parser.add_argument("-f", "--flow-id", help="Flow ID for delete and run")
    parser.add_argument("-t", "--title", help="Name for create")
    args = parser.parse_args()

    try:
        if args.action == "logout":
            logout()
        elif args.action == "create":
            if args.title is None:
                parser.error("create requires --title")
            create_flow(args)
        elif args.action == "delete":
            if args.flow_id is None:
                parser.error("delete requires --flow-id")
            delete_flow(args)
        elif args.action == "list":
            list_flows()
        elif args.action == "run":
            if args.flow_id is None:
                parser.error("run requires --flow-id")
            run_flow(args)
        else:
            raise NotImplementedError()
    except globus_sdk.FlowsAPIError as e:
        print(f"API Error: {e.code} {e.message}")
        print(e.text)
        sys.exit(1)


if __name__ == "__main__":
    main()