Source code for

from __future__ import annotations

import typing as t
import uuid

from .client import AuthClient

def is_username(val: str) -> bool:
    If the value parses as a UUID, then it's an ID, not a username.
    If it does not parse as such, then it must be a username.
        return False
    except ValueError:
        return True

def split_ids_and_usernames(
    identity_ids: t.Iterable[str],
) -> tuple[set[str], set[str]]:
    ids = set()
    usernames = set()

    for val in identity_ids:
        if is_username(val):

    return ids, usernames

[docs]class IdentityMap: r""" There's a common pattern of having a large batch of Globus Auth Identities which you want to inspect. For example, you may have a list of identity IDs fetched from Access Control Lists on Globus Endpoints. In order to display these identities to an end user, you may want to resolve them to usernames. However, naively looking up the identities one-by-one is very inefficient. It's best to do batched lookups with multiple identities at once. In these cases, an ``IdentityMap`` can be used to do those batched lookups for you. An ``IdentityMap`` is a mapping-like type which converts Identity IDs and Identity Names to Identity records (dictionaries) using the Globus Auth API. .. note:: ``IdentityMap`` objects are not full Mappings in the same sense as python dicts and similar objects. By design, they only implement a small part of the Mapping protocol. The basic usage pattern is - create an ``IdentityMap`` with an AuthClient which will be used to call out to Globus Auth - seed the ``IdentityMap`` with IDs and Usernames via :py:meth:`~IdentityMap.add` (you can also do this during initialization) - retrieve identity IDs or Usernames from the map Because the map can be populated with a collection of identity IDs and Usernames prior to lookups being performed, it can improve the efficiency of these operations up to 100x over individual lookups. If you attempt to retrieve an identity which has not been previously added to the map, it will be immediately added. But adding many identities beforehand will improve performance. The ``IdentityMap`` will cache its results so that repeated lookups of the same Identity will not repeat work. It will also map identities both by ID and by Username, regardless of how they're initially looked up. .. warning:: If an Identity is not found in Globus Auth, it will trigger a KeyError when looked up. Your code must be ready to handle KeyErrors when doing a lookup. Correct usage looks something like so:: ac = globus_sdk.AuthClient(...) idmap = globus_sdk.IdentityMap( ac, ["", ""] ) idmap.add("") # adding by ID is also valid idmap.add("c699d42e-d274-11e5-bf75-1fc5bf53bb24") # map ID to username assert ( idmap["c699d42e-d274-11e5-bf75-1fc5bf53bb24"]["username"] == "" ) # map username to ID assert ( idmap[""]["id"] == "c699d42e-d274-11e5-bf75-1fc5bf53bb24" ) And simple handling of errors:: try: record = idmap[""] except KeyError: username = "NO_SUCH_IDENTITY" else: username = record["username"] or you may achieve this by using the :py:meth:`~.IdentityMap.get` method:: # internally handles the KeyError and returns the default value record = idmap.get("", None) username = record["username"] if record is not None else "NO_SUCH_IDENTITY" :param auth_client: The client object which will be used for lookups against Globus Auth :type auth_client: :class:`AuthClient <globus_sdk.AuthClient>` :param identity_ids: A list or other iterable of usernames or identity IDs (potentially mixed together) which will be used to seed the ``IdentityMap`` 's tracking of unresolved Identities. :type identity_ids: iterable of str, optional :param id_batch_size: A non-default batch size to use when communicating with Globus Auth. Leaving this set to the default is strongly recommended. :type id_batch_size: int, optional :param cache: A dict or other mapping object which will be used to cache results. The default is that results are cached once per IdentityMap object. If you want multiple IdentityMaps to share data, explicitly pass the same ``cache`` to both. :type cache: MutableMapping, optional .. automethodlist:: globus_sdk.IdentityMap :include_methods: __getitem__,__delitem__ """ # noqa _default_id_batch_size = 100
[docs] def __init__( self, auth_client: AuthClient, identity_ids: t.Iterable[str] | None = None, *, id_batch_size: int | None = None, cache: None | (t.MutableMapping[str, dict[str, t.Any]]) = None, ): self.auth_client = auth_client self.id_batch_size = id_batch_size or self._default_id_batch_size # uniquify, copy, and split into IDs vs usernames self.unresolved_ids, self.unresolved_usernames = split_ids_and_usernames( [] if identity_ids is None else identity_ids ) # a cache may be passed in via the constructor in order to make multiple # IdentityMap objects share a cache self._cache = cache if cache is not None else {}
def _create_batch(self, key: str) -> set[str]: """ Create a batch to do a lookup. For whichever set of unresolved names is appropriate, build the batch to lookup up to *at most* the batch size. Also, remove the unresolved names from tracking so that they will not be looked up again. """ key_is_username = is_username(key) set_to_use = ( self.unresolved_usernames if key_is_username else self.unresolved_ids ) # start the batch with the key being looked up, and if it is in the unresolved # list remove it batch = {key} if key in set_to_use: set_to_use.remove(key) # until we've exhausted the set or filled the batch, keep trying to add while set_to_use and len(batch) < self.id_batch_size: value = set_to_use.pop() # value may already have been looked up if the cache is shared, skip those if value in self._cache: continue batch.add(value) return batch def _fetch_batch_including(self, key: str) -> None: """ Batch resolve identifiers (usernames or IDs), being sure to include the desired, named key. The key also determines which kind of batch will be built -- usernames or IDs. Store the results in the internal cache. """ batch = self._create_batch(key) if is_username(key): response = self.auth_client.get_identities(usernames=batch) else: response = self.auth_client.get_identities(ids=batch) for x in response["identities"]: self._cache[x["id"]] = x self._cache[x["username"]] = x
[docs] def add(self, identity_id: str) -> bool: """ Add a username or ID to the ``IdentityMap`` for batch lookups later. Returns True if the ID was added for lookup. Returns False if it was rejected as a duplicate of an already known name. :param identity_id: A string Identity ID or Identity Name (a.k.a. "username") to add :type identity_id: str """ if identity_id in self._cache: return False if is_username(identity_id): if identity_id in self.unresolved_usernames: return False else: self.unresolved_usernames.add(identity_id) return True if identity_id in self.unresolved_ids: return False self.unresolved_ids.add(identity_id) return True
[docs] def get(self, key: str, default: t.Any | None = None) -> t.Any: """ A dict-like get() method which accepts a default value. """ try: return self[key] except KeyError: return default
[docs] def __getitem__(self, key: str) -> t.Any: """ ``IdentityMap`` supports dict-like lookups with ``map[key]`` """ if key not in self._cache: self._fetch_batch_including(key) return self._cache[key]
[docs] def __delitem__(self, key: str) -> None: """ ``IdentityMap`` supports ``del map[key]``. Note that this only removes lookup values from the cache and will not impact the set of unresolved/pending IDs. """ del self._cache[key]