Source code for globus_sdk.services.transfer.data.transfer_data

from __future__ import annotations

import datetime
import logging
import sys
import typing as t

if sys.version_info >= (3, 8):
    from typing import Literal
else:
    from typing_extensions import Literal

from globus_sdk import exc, utils
from globus_sdk._types import UUIDLike

if t.TYPE_CHECKING:
    import globus_sdk

log = logging.getLogger(__name__)
_StrSyncLevel = Literal["exists", "size", "mtime", "checksum"]
_sync_level_dict: dict[_StrSyncLevel, int] = {
    "exists": 0,
    "size": 1,
    "mtime": 2,
    "checksum": 3,
}


def _parse_sync_level(sync_level: _StrSyncLevel | int) -> int:
    """
    Map sync_level strings to known int values

    Important: if more levels are added in the future you can always pass as an int
    """
    if isinstance(sync_level, str):
        try:
            sync_level = _sync_level_dict[sync_level]
        except KeyError as err:
            raise ValueError(f"Unrecognized sync_level {sync_level}") from err
    return sync_level


[docs] class TransferData(utils.PayloadWrapper): r""" Convenience class for constructing a transfer document, to use as the ``data`` parameter to :meth:`submit_transfer <globus_sdk.TransferClient.submit_transfer>`. At least one item must be added using :meth:`add_item <globus_sdk.TransferData.add_item>`. If ``submission_id`` isn't passed, one will be fetched automatically. The submission ID can be pulled out of here to inspect, but the document can be used as-is multiple times over to retry a potential submission failure (so there shouldn't be any need to inspect it). :param transfer_client: A ``TransferClient`` instance which will be used to get a submission ID if one is not supplied. Should be the same instance that is used to submit the transfer. :param source_endpoint: The endpoint ID of the source endpoint :param destination_endpoint: The endpoint ID of the destination endpoint :param label: A string label for the Task :param submission_id: A submission ID value fetched via :meth:`get_submission_id \ <globus_sdk.TransferClient.get_submission_id>`. Defaults to using ``transfer_client.get_submission_id`` :param sync_level: The method used to compare items between the source and destination. One of ``"exists"``, ``"size"``, ``"mtime"``, or ``"checksum"`` See the section below on sync-level for an explanation of values. :param verify_checksum: When true, after transfer verify that the source and destination file checksums match. If they don't, re-transfer the entire file and keep trying until it succeeds. This will create CPU load on both the origin and destination of the transfer, and may even be a bottleneck if the network speed is high enough. [default: ``False``] :param preserve_timestamp: When true, Globus Transfer will attempt to set file timestamps on the destination to match those on the origin. [default: ``False``] :param encrypt_data: When true, all files will be TLS-protected during transfer. [default: ``False``] :param deadline: An ISO-8601 timestamp (as a string) or a datetime object which defines a deadline for the transfer. At the deadline, even if the data transfer is not complete, the job will be canceled. We recommend ensuring that the timestamp is in UTC to avoid confusion and ambiguity. Examples of ISO-8601 timestamps include ``2017-10-12 09:30Z``, ``2017-10-12 12:33:54+00:00``, and ``2017-10-12`` :param recursive_symlinks: Specify the behavior of recursive directory transfers when encountering symlinks. One of ``"ignore"``, ``"keep"``, or ``"copy"``. ``"ignore"`` skips symlinks, ``"keep"`` creates symlinks at the destination matching the source (without modifying the link path at all), and ``"copy"`` follows symlinks on the source, failing if the link is invalid. [default: ``"ignore"``] :param skip_activation_check: When true, allow submission even if the endpoints aren't currently activated :param skip_source_errors: When true, source permission denied and file not found errors from the source endpoint will cause the offending path to be skipped. [default: ``False``] :param fail_on_quota_errors: When true, quota exceeded errors will cause the task to fail. [default: ``False``] :param delete_destination_extra: Delete files, directories, and symlinks on the destination endpoint which don’t exist on the source endpoint or are a different type. Only applies for recursive directory transfers. [default: ``False``] :param notify_on_succeeded: Send a notification email when the transfer completes with a status of SUCCEEDED. [default: ``True``] :param notify_on_failed: Send a notification email when the transfer completes with a status of FAILED. [default: ``True``] :param notify_on_inactive: Send a notification email when the transfer changes status to INACTIVE. e.g. From credentials expiring. [default: ``True``] :param source_local_user: Optional value passed to the source's identity mapping specifying which local user account to map to. Only usable with Globus Connect Server v5 mapped collections. :param destination_local_user: Optional value passed to the destination's identity mapping specifying which local user account to map to. Only usable with Globus Connect Server v5 mapped collections. :param additional_fields: additional fields to be added to the transfer document. Mostly intended for internal use **Sync Levels** The values for ``sync_level`` are used to determine how comparisons are made between files found both on the source and the destination. When files match, no data transfer will occur. For compatibility, this can be an integer ``0``, ``1``, ``2``, or ``3`` in addition to the string values. The meanings are as follows: ===================== ======== value behavior ===================== ======== ``0``, ``exists`` Determine whether or not to transfer based on file existence. If the destination file is absent, do the transfer. ``1``, ``size`` Determine whether or not to transfer based on the size of the file. If destination file size does not match the source, do the transfer. ``2``, ``mtime`` Determine whether or not to transfer based on modification times. If source has a newer modified time than the destination, do the transfer. ``3``, ``checksum`` Determine whether or not to transfer based on checksums of file contents. If source and destination contents differ, as determined by a checksum of their contents, do the transfer. ===================== ======== **Examples** See the :meth:`submit_transfer <globus_sdk.TransferClient.submit_transfer>` documentation for example usage. **External Documentation** See the `Task document definition \ <https://docs.globus.org/api/transfer/task_submit/#document_types>`_ and `Transfer specific fields \ <https://docs.globus.org/api/transfer/task_submit/#transfer_specific_fields>`_ in the REST documentation for more details on Transfer Task documents. .. automethodlist:: globus_sdk.TransferData """ def __init__( self, transfer_client: globus_sdk.TransferClient | None = None, source_endpoint: UUIDLike | None = None, destination_endpoint: UUIDLike | None = None, *, label: str | None = None, submission_id: UUIDLike | None = None, sync_level: _StrSyncLevel | int | None = None, verify_checksum: bool = False, preserve_timestamp: bool = False, encrypt_data: bool = False, deadline: datetime.datetime | str | None = None, skip_activation_check: bool | None = None, skip_source_errors: bool = False, fail_on_quota_errors: bool = False, recursive_symlinks: str | None = None, delete_destination_extra: bool = False, notify_on_succeeded: bool = True, notify_on_failed: bool = True, notify_on_inactive: bool = True, source_local_user: str | None = None, destination_local_user: str | None = None, additional_fields: dict[str, t.Any] | None = None, ) -> None: super().__init__() # these must be checked explicitly to handle the fact that `transfer_client` is # the first arg if source_endpoint is None: raise exc.GlobusSDKUsageError("source_endpoint is required") if destination_endpoint is None: raise exc.GlobusSDKUsageError("destination_endpoint is required") log.info("Creating a new TransferData object") self["DATA_TYPE"] = "transfer" self["DATA"] = [] self._set_optstrs( source_endpoint=source_endpoint, destination_endpoint=destination_endpoint, label=label, submission_id=submission_id or ( transfer_client.get_submission_id()["value"] if transfer_client else None ), recursive_symlinks=recursive_symlinks, deadline=deadline, source_local_user=source_local_user, destination_local_user=destination_local_user, ) self._set_optbools( verify_checksum=verify_checksum, preserve_timestamp=preserve_timestamp, encrypt_data=encrypt_data, skip_activation_check=skip_activation_check, skip_source_errors=skip_source_errors, fail_on_quota_errors=fail_on_quota_errors, delete_destination_extra=delete_destination_extra, notify_on_succeeded=notify_on_succeeded, notify_on_failed=notify_on_failed, notify_on_inactive=notify_on_inactive, ) self._set_value("sync_level", sync_level, callback=_parse_sync_level) for k, v in self.items(): log.info("TransferData.%s = %s", k, v) if additional_fields is not None: self.update(additional_fields) for option, value in additional_fields.items(): log.info( f"TransferData.{option} = {value} (option passed " "in via additional_fields)" )
[docs] def add_item( self, source_path: str, destination_path: str, *, recursive: bool | None = None, external_checksum: str | None = None, checksum_algorithm: str | None = None, additional_fields: dict[str, t.Any] | None = None, ) -> None: """ Add a file or directory to be transferred. If the item is a symlink to a file or directory, the file or directory at the target of the symlink will be transferred. Appends a transfer_item document to the DATA key of the transfer document. .. note:: The full path to the destination file must be provided for file items. Parent directories of files are not allowed. See `task submission documentation <https://docs.globus.org/api/transfer/task_submit/#submit_transfer_task>`_ for more details. :param source_path: Path to the source directory or file to be transferred :param destination_path: Path to the destination directory or file will be transferred to :param recursive: Set to True if the target at source path is a directory :param external_checksum: A checksum to verify both source file and destination file integrity. The checksum will be verified after the data transfer and a failure will cause the entire task to fail. Cannot be used with directories. Assumed to be an MD5 checksum unless checksum_algorithm is also given. :param checksum_algorithm: Specifies the checksum algorithm to be used when verify_checksum is True, sync_level is "checksum" or 3, or an external_checksum is given. :param additional_fields: additional fields to be added to the transfer item """ item_data: dict[str, t.Any] = { "DATA_TYPE": "transfer_item", "source_path": source_path, "destination_path": destination_path, } if recursive is not None: item_data["recursive"] = recursive if external_checksum is not None: item_data["external_checksum"] = external_checksum if checksum_algorithm is not None: item_data["checksum_algorithm"] = checksum_algorithm if additional_fields is not None: item_data.update(additional_fields) log.debug( 'TransferData[{}, {}].add_item: "{}"->"{}"'.format( self["source_endpoint"], self["destination_endpoint"], source_path, destination_path, ) ) self["DATA"].append(item_data)
[docs] def add_filter_rule( self, name: str, *, method: Literal["include", "exclude"] = "exclude", type: None | Literal["file", "dir"] = None, # pylint: disable=redefined-builtin ) -> None: """ Add a filter rule to the transfer document. These rules specify which items are or are not included when recursively transferring directories. Each item that is found during recursive directory traversal is matched against these rules in the order they are listed. The method of the first filter rule that matches an item is applied (either "include" or "exclude"), and filter rule matching stops. If no rules match, the item is included in the transfer. Notably, this makes "include" filter rules only useful when overriding more general "exclude" filter rules later in the list. :param name: A pattern to match against item names. Wildcards are supported, as are character groups: ``*`` matches everything, ``?`` matches any single character, ``[]`` matches any single character within the brackets, and ``[!]`` matches any single character not within the brackets. :param method: The method to use for filtering. If "exclude" (the default) items matching this rule will not be included in the transfer. If "include" items matching this rule will be included in the transfer. :param type: The types of items on which to apply this filter rule. Either ``"file"`` or ``"dir"``. If unspecified, the rule applies to both. Note that if a ``"dir"`` is excluded then all items within it will also be excluded regardless if they would have matched any include rules. Example Usage: >>> tdata = TransferData(...) >>> tdata.add_filter_rule(method="exclude", "*.tgz", type="file") >>> tdata.add_filter_rule(method="exclude", "*.tar.gz", type="file") ``tdata`` now describes a transfer which will skip any gzipped tar files with the extensions ``.tgz`` or ``.tar.gz`` >>> tdata = TransferData(...) >>> tdata.add_filter_rule(method="include", "*.txt", type="file") >>> tdata.add_filter_rule(method="exclude", "*", type="file") ``tdata`` now describes a transfer which will only transfer files with the ``.txt`` extension. """ if "filter_rules" not in self: self["filter_rules"] = [] rule = { "DATA_TYPE": "filter_rule", "method": method, "name": name, } if type is not None: rule["type"] = type self["filter_rules"].append(rule)
[docs] def iter_items(self) -> t.Iterator[dict[str, t.Any]]: """ An iterator of items created by ``add_item``. Each item takes the form of a dictionary. """ yield from iter(self["DATA"])