Source code for globus_sdk.services.timer.client

from __future__ import annotations

import logging
import typing as t

from globus_sdk import client, exc, response
from globus_sdk._types import UUIDLike
from globus_sdk.scopes import TimerScopes

from .data import TimerJob, TransferTimer
from .errors import TimerAPIError

log = logging.getLogger(__name__)


[docs] class TimerClient(client.BaseClient): r""" Client for the Globus Timer API. .. automethodlist:: globus_sdk.TimerClient """ error_class = TimerAPIError service_name = "timer" scopes = TimerScopes
[docs] def list_jobs( self, *, query_params: dict[str, t.Any] | None = None ) -> response.GlobusHTTPResponse: """ ``GET /jobs/`` :param query_params: additional parameters to pass as query params **Examples** >>> timer_client = globus_sdk.TimerClient(...) >>> jobs = timer_client.list_jobs() """ log.info(f"TimerClient.list_jobs({query_params})") return self.get("/jobs/", query_params=query_params)
[docs] def get_job( self, job_id: UUIDLike, *, query_params: dict[str, t.Any] | None = None, ) -> response.GlobusHTTPResponse: """ ``GET /jobs/<job_id>`` :param job_id: the ID of the timer ("job") :param query_params: additional parameters to pass as query params **Examples** >>> timer_client = globus_sdk.TimerClient(...) >>> job = timer_client.get_job(job_id) >>> assert job["job_id"] == job_id """ log.info(f"TimerClient.get_job({job_id})") return self.get(f"/jobs/{job_id}", query_params=query_params)
[docs] def create_timer( self, timer: dict[str, t.Any] | TransferTimer ) -> response.GlobusHTTPResponse: """ :param timer: a document defining the new timer A ``TransferTimer`` object can be constructed from a ``TransferData`` object, which is the recommended way to create a timer for data transfers. **Examples** .. tab-set:: .. tab-item:: Example Usage .. code-block:: pycon >>> transfer_client = TransferClient(...) >>> transfer_data = TransferData(transfer_client, ...) >>> timer_client = globus_sdk.TimerClient(...) >>> create_doc = globus_sdk.TransferTimer( ... name="my-timer", ... schedule={"type": "recurring", "interval": 1800}, ... body=transfer_data, ... ) >>> response = timer_client.create_timer(timer=create_doc) .. tab-item:: Example Response Data .. expandtestfixture:: timer.create_timer .. tab-item:: API Info ``POST /v2/timer`` """ if isinstance(timer, TimerJob): raise exc.GlobusSDKUsageError( "Cannot pass a TimerJob to create_timer(). " "Create a TransferTimer instead." ) log.info("TimerClient.create_timer(...)") return self.post("/v2/timer", data={"timer": timer})
[docs] def create_job( self, data: dict[str, t.Any] | TimerJob ) -> response.GlobusHTTPResponse: """ ``POST /jobs/`` :param data: a timer document used to create the new timer ("job") **Examples** >>> from datetime import datetime, timedelta >>> transfer_client = TransferClient(...) >>> transfer_data = TransferData(transfer_client, ...) >>> timer_client = globus_sdk.TimerClient(...) >>> job = TimerJob.from_transfer_data( ... transfer_data, ... datetime.utcnow(), ... timedelta(days=14), ... name="my-timer-job" ... ) >>> timer_result = timer_client.create_job(job) """ if isinstance(data, TransferTimer): raise exc.GlobusSDKUsageError( "Cannot pass a TransferTimer to create_job(). Use create_timer() " "instead." ) log.info(f"TimerClient.create_job({data})") return self.post("/jobs/", data=data)
[docs] def update_job( self, job_id: UUIDLike, data: dict[str, t.Any] ) -> response.GlobusHTTPResponse: """ ``PATCH /jobs/<job_id>`` :param job_id: the ID of the timer ("job") :param data: a partial timer document used to update the job **Examples** >>> timer_client = globus_sdk.TimerClient(...) >>> timer_client.update_job(job_id, {"name": "new name}"}) """ log.info(f"TimerClient.update_job({job_id}, {data})") return self.patch(f"/jobs/{job_id}", data=data)
[docs] def delete_job( self, job_id: UUIDLike, ) -> response.GlobusHTTPResponse: """ ``DELETE /jobs/<job_id>`` :param job_id: the ID of the timer ("job") **Examples** >>> timer_client = globus_sdk.TimerClient(...) >>> timer_client.delete_job(job_id) """ log.info(f"TimerClient.delete_job({job_id})") return self.delete(f"/jobs/{job_id}")
[docs] def pause_job( self, job_id: UUIDLike, ) -> response.GlobusHTTPResponse: """ Make a timer job inactive, preventing it from running until it is resumed. :param job_id: The ID of the timer to pause **Examples** >>> timer_client = globus_sdk.TimerClient(...) >>> timer_client.pause_job(job_id) """ log.info(f"TimerClient.pause_job({job_id})") return self.post(f"/jobs/{job_id}/pause")
[docs] def resume_job( self, job_id: UUIDLike, *, update_credentials: bool | None = None, ) -> response.GlobusHTTPResponse: """ Resume an inactive timer job, optionally replacing credentials to resolve issues with insufficient authorization. :param job_id: The ID of the timer to resume :param update_credentials: When true, replace the credentials for the timer using the credentials for this resume call. This can be used to resolve authorization errors (such as session and consent errors), but it also could introduce session and consent errors, if the credentials being used to resume lack some necessary properties of the credentials they're replacing. If not supplied, the Timers service will determine whether to replace credentials according to the reason why the timer job became inactive. **Examples** >>> timer_client = globus_sdk.TimerClient(...) >>> timer_client.resume_job(job_id) """ log.info(f"TimerClient.resume_job({job_id})") data = {} if update_credentials is not None: data["update_credentials"] = update_credentials return self.post(f"/jobs/{job_id}/resume", data=data)