Timer Operations ScriptΒΆ

This example demonstrates the usage of methods on the Timer client. Note that we first need a Transfer client set up in order to create the TransferData we will use to define our recurring Timer job.

import os
from datetime import datetime, timedelta

from globus_sdk import (
    AccessTokenAuthorizer,
    NativeAppAuthClient,
    TimerClient,
    TimerJob,
    TransferClient,
    TransferData,
)

GOEP1 = "ddb59aef-6d04-11e5-ba46-22000b92c6ec"
GOEP2 = "ddb59af0-6d04-11e5-ba46-22000b92c6ec"
TIMER_CLIENT_ID = "524230d7-ea86-4a52-8312-86065a9e0417"
TIMER_SCOPE = f"https://auth.globus.org/scopes/{TIMER_CLIENT_ID}/timer"
TRANSFER_SCOPE = "urn:globus:auth:scope:transfer.api.globus.org:all"
SCOPES = [TIMER_SCOPE, TRANSFER_SCOPE]

# Get access tokens to use for both Transfer and Timer
native_client.oauth2_start_flow(requested_scopes=SCOPES)
authorize_url = native_client.oauth2_get_authorize_url()
print(f"Please go to this URL and login:\n\n{authorize_url}\n")
auth_code = input("Enter the auth code here: ").strip()
token_response = native_client.oauth2_exchange_code_for_tokens(auth_code)
timer_token = token_response.by_resource_server[TIMER_CLIENT_ID]["access_token"]
transfer_token = token_response.by_resource_server["transfer.api.globus.org"][
    "access_token"
]

# Set up a transfer client and create a `TransferData` object
transfer_authorizer = AccessTokenAuthorizer(transfer_token)
transfer_client = TransferClient(authorizer=transfer_authorizer)
data = TransferData(transfer_client, GOEP1, GOEP2)
data.add_item("/share/godata/file1.txt", "/~/file1.txt")

# Set up the Timer client
timer_authorizer = AccessTokenAuthorizer(timer_token)
timer_client = TimerClient(authorizer=timer_authorizer)

# Create a Timer job, set to run the above transfer 2 times
start = datetime.utcnow()
interval = timedelta(minutes=30)
name = "sdk-timer-example"
job = TimerJob.from_transfer_data(data, start, interval, stop_after_n=2, name=name)
response = timer_client.create_job(job)
assert response.http_status == 201
job_id = response["job_id"]
print(f"Timer job ID: {job_id}")

all_jobs = timer_client.list_jobs()
assert job_id in {job["job_id"] for job in timer_client.list_jobs()["jobs"]}

get_job_response = timer_client.get_job(job_id)
assert get_job_response.http_status == 200
assert get_job_response["name"] == name

# Later, you could run this function to clean up the job:
def delete_timer_job(job_id):
    delete_job_response = timer_client.delete_job(job_id)
    assert delete_job_response.http_status == 200