from __future__ import annotations
import logging
import typing as t
import uuid
from globus_sdk import _guards, client, exc, response
from globus_sdk._types import UUIDLike
from globus_sdk.scopes import (
GCSCollectionScopeBuilder,
Scope,
TimersScopes,
TransferScopes,
)
from .data import TimerJob, TransferTimer
from .errors import TimersAPIError
log = logging.getLogger(__name__)
[docs]
class TimersClient(client.BaseClient):
r"""
Client for the Globus Timer API.
.. sdk-sphinx-copy-params:: BaseClient
.. automethodlist:: globus_sdk.TimersClient
"""
error_class = TimersAPIError
service_name = "timer"
scopes = TimersScopes
default_scope_requirements = [Scope(TimersScopes.timer)]
[docs]
def add_app_transfer_data_access_scope(
self, collection_ids: UUIDLike | t.Iterable[UUIDLike]
) -> TimersClient:
"""
Add a dependent ``data_access`` scope for one or more given ``collection_ids``
to this client's ``GlobusApp``, under the Transfer ``all`` scope.
Useful for preventing ``ConsentRequired`` errors when creating timers
that use Globus Connect Server mapped collection(s) as the source or
destination.
.. warning::
This method must only be used on ``collection_ids`` for non-High-Assurance
GCS Mapped Collections.
Use on other collection types, e.g., on GCP Mapped Collections or any form
of Guest Collection, will result in "Unknown Scope" errors during the login
flow.
Returns ``self`` for chaining.
Raises ``GlobusSDKUsageError`` if this client was not initialized with an app.
:param collection_ids: a collection ID or an iterable of IDs.
.. tab-set::
.. tab-item:: Example Usage
.. code-block:: python
app = UserApp("myapp", client_id=NATIVE_APP_CLIENT_ID)
client = TimersClient(app=app).add_app_transfer_data_access_scope(COLLECTION_ID)
transfer_data = TransferData(
source_endpoint=COLLECTION_ID, destination_endpoint=COLLECTION_ID
)
transfer_data.add_item("/staging/", "/active/")
daily_timer = TransferTimer(
name="test_timer", schedule=RecurringTimerSchedule(24 * 60 * 60), body=transfer_data
)
client.create_timer(daily_timer)
""" # noqa: E501
if isinstance(collection_ids, (str, uuid.UUID)):
_guards.validators.uuidlike("collection_ids", collection_ids)
# wrap the collection_ids input in a list for consistent iteration below
collection_ids_ = [collection_ids]
else:
# copy to a list so that ephemeral iterables can be iterated multiple times
collection_ids_ = list(collection_ids)
for i, c in enumerate(collection_ids_):
_guards.validators.uuidlike(f"collection_ids[{i}]", c)
transfer_scope = Scope(TransferScopes.all)
for coll_id in collection_ids_:
data_access_scope = Scope(
GCSCollectionScopeBuilder(str(coll_id)).data_access,
optional=True,
)
transfer_scope.add_dependency(data_access_scope)
timers_scope = Scope(TimersScopes.timer)
timers_scope.add_dependency(transfer_scope)
self.add_app_scope(timers_scope)
return self
[docs]
def list_jobs(
self, *, query_params: dict[str, t.Any] | None = None
) -> response.GlobusHTTPResponse:
"""
``GET /jobs/``
:param query_params: additional parameters to pass as query params
**Examples**
>>> timer_client = globus_sdk.TimersClient(...)
>>> jobs = timer_client.list_jobs()
"""
log.debug(f"TimersClient.list_jobs({query_params})")
return self.get("/jobs/", query_params=query_params)
[docs]
def get_job(
self,
job_id: UUIDLike,
*,
query_params: dict[str, t.Any] | None = None,
) -> response.GlobusHTTPResponse:
"""
``GET /jobs/<job_id>``
:param job_id: the ID of the timer ("job")
:param query_params: additional parameters to pass as query params
**Examples**
>>> timer_client = globus_sdk.TimersClient(...)
>>> job = timer_client.get_job(job_id)
>>> assert job["job_id"] == job_id
"""
log.debug(f"TimersClient.get_job({job_id})")
return self.get(f"/jobs/{job_id}", query_params=query_params)
[docs]
def create_timer(
self, timer: dict[str, t.Any] | TransferTimer
) -> response.GlobusHTTPResponse:
"""
:param timer: a document defining the new timer
A ``TransferTimer`` object can be constructed from a ``TransferData`` object,
which is the recommended way to create a timer for data transfers.
**Examples**
.. tab-set::
.. tab-item:: Example Usage
.. code-block:: pycon
>>> transfer_client = TransferClient(...)
>>> transfer_data = TransferData(transfer_client, ...)
>>> timer_client = globus_sdk.TimersClient(...)
>>> create_doc = globus_sdk.TransferTimer(
... name="my-timer",
... schedule={"type": "recurring", "interval": 1800},
... body=transfer_data,
... )
>>> response = timer_client.create_timer(timer=create_doc)
.. tab-item:: Example Response Data
.. expandtestfixture:: timer.create_timer
.. tab-item:: API Info
``POST /v2/timer``
"""
if isinstance(timer, TimerJob):
raise exc.GlobusSDKUsageError(
"Cannot pass a TimerJob to create_timer(). "
"Create a TransferTimer instead."
)
log.debug("TimersClient.create_timer(...)")
return self.post("/v2/timer", data={"timer": timer})
[docs]
def create_job(
self, data: dict[str, t.Any] | TimerJob
) -> response.GlobusHTTPResponse:
"""
``POST /jobs/``
:param data: a timer document used to create the new timer ("job")
**Examples**
>>> from datetime import datetime, timedelta
>>> transfer_client = TransferClient(...)
>>> transfer_data = TransferData(transfer_client, ...)
>>> timer_client = globus_sdk.TimersClient(...)
>>> job = TimerJob.from_transfer_data(
... transfer_data,
... datetime.utcnow(),
... timedelta(days=14),
... name="my-timer-job"
... )
>>> timer_result = timer_client.create_job(job)
"""
if isinstance(data, TransferTimer):
raise exc.GlobusSDKUsageError(
"Cannot pass a TransferTimer to create_job(). Use create_timer() "
"instead."
)
log.debug(f"TimersClient.create_job({data})")
return self.post("/jobs/", data=data)
[docs]
def update_job(
self, job_id: UUIDLike, data: dict[str, t.Any]
) -> response.GlobusHTTPResponse:
"""
``PATCH /jobs/<job_id>``
:param job_id: the ID of the timer ("job")
:param data: a partial timer document used to update the job
**Examples**
>>> timer_client = globus_sdk.TimersClient(...)
>>> timer_client.update_job(job_id, {"name": "new name}"})
"""
log.debug(f"TimersClient.update_job({job_id}, {data})")
return self.patch(f"/jobs/{job_id}", data=data)
[docs]
def delete_job(
self,
job_id: UUIDLike,
) -> response.GlobusHTTPResponse:
"""
``DELETE /jobs/<job_id>``
:param job_id: the ID of the timer ("job")
**Examples**
>>> timer_client = globus_sdk.TimersClient(...)
>>> timer_client.delete_job(job_id)
"""
log.debug(f"TimersClient.delete_job({job_id})")
return self.delete(f"/jobs/{job_id}")
[docs]
def pause_job(
self,
job_id: UUIDLike,
) -> response.GlobusHTTPResponse:
"""
Make a timer job inactive, preventing it from running until it is resumed.
:param job_id: The ID of the timer to pause
**Examples**
>>> timer_client = globus_sdk.TimersClient(...)
>>> timer_client.pause_job(job_id)
"""
log.debug(f"TimersClient.pause_job({job_id})")
return self.post(f"/jobs/{job_id}/pause")
[docs]
def resume_job(
self,
job_id: UUIDLike,
*,
update_credentials: bool | None = None,
) -> response.GlobusHTTPResponse:
"""
Resume an inactive timer job, optionally replacing credentials to resolve
issues with insufficient authorization.
:param job_id: The ID of the timer to resume
:param update_credentials: When true, replace the credentials for the timer
using the credentials for this resume call. This can be used to resolve
authorization errors (such as session and consent errors), but it also
could introduce session and consent errors, if the credentials being used
to resume lack some necessary properties of the credentials they're
replacing. If not supplied, the Timers service will determine whether to
replace credentials according to the reason why the timer job
became inactive.
**Examples**
>>> timer_client = globus_sdk.TimersClient(...)
>>> timer_client.resume_job(job_id)
"""
log.debug(f"TimersClient.resume_job({job_id})")
data = {}
if update_credentials is not None:
data["update_credentials"] = update_credentials
return self.post(f"/jobs/{job_id}/resume", data=data)