from __future__ import annotations
import typing as t
import uuid
from .client import AuthClient
def is_username(val: str) -> bool:
"""
If the value parses as a UUID, then it's an ID, not a username.
If it does not parse as such, then it must be a username.
"""
try:
uuid.UUID(val)
return False
except ValueError:
return True
def split_ids_and_usernames(
identity_ids: t.Iterable[str],
) -> tuple[set[str], set[str]]:
ids = set()
usernames = set()
for val in identity_ids:
if is_username(val):
usernames.add(val)
else:
ids.add(val)
return ids, usernames
[docs]class IdentityMap:
r"""
There's a common pattern of having a large batch of Globus Auth Identities which you
want to inspect. For example, you may have a list of identity IDs fetched from
Access Control Lists on Globus Endpoints. In order to display these identities to an
end user, you may want to resolve them to usernames.
However, naively looking up the identities one-by-one is very inefficient. It's best
to do batched lookups with multiple identities at once. In these cases, an
``IdentityMap`` can be used to do those batched lookups for you.
An ``IdentityMap`` is a mapping-like type which converts Identity IDs and Identity
Names to Identity records (dictionaries) using the Globus Auth API.
.. note::
``IdentityMap`` objects are not full Mappings in the same sense as python dicts
and similar objects. By design, they only implement a small part of the Mapping
protocol.
The basic usage pattern is
- create an ``IdentityMap`` with an AuthClient which will be used to call out to
Globus Auth
- seed the ``IdentityMap`` with IDs and Usernames via :py:meth:`~IdentityMap.add` (you
can also do this during initialization)
- retrieve identity IDs or Usernames from the map
Because the map can be populated with a collection of identity IDs and Usernames
prior to lookups being performed, it can improve the efficiency of these operations
up to 100x over individual lookups.
If you attempt to retrieve an identity which has not been previously added to the
map, it will be immediately added. But adding many identities beforehand will
improve performance.
The ``IdentityMap`` will cache its results so that repeated lookups of the same Identity
will not repeat work. It will also map identities both by ID and by Username,
regardless of how they're initially looked up.
.. warning::
If an Identity is not found in Globus Auth, it will trigger a KeyError when
looked up. Your code must be ready to handle KeyErrors when doing a lookup.
Correct usage looks something like so::
ac = globus_sdk.AuthClient(...)
idmap = globus_sdk.IdentityMap(
ac, ["foo@globusid.org", "bar@uchicago.edu"]
)
idmap.add("baz@xsede.org")
# adding by ID is also valid
idmap.add("c699d42e-d274-11e5-bf75-1fc5bf53bb24")
# map ID to username
assert (
idmap["c699d42e-d274-11e5-bf75-1fc5bf53bb24"]["username"]
== "go@globusid.org"
)
# map username to ID
assert (
idmap["go@globusid.org"]["id"]
== "c699d42e-d274-11e5-bf75-1fc5bf53bb24"
)
And simple handling of errors::
try:
record = idmap["no-such-valid-id@example.org"]
except KeyError:
username = "NO_SUCH_IDENTITY"
else:
username = record["username"]
or you may achieve this by using the :py:meth:`~.IdentityMap.get` method::
# internally handles the KeyError and returns the default value
record = idmap.get("no-such-valid-id@example.org", None)
username = record["username"] if record is not None else "NO_SUCH_IDENTITY"
:param auth_client: The client object which will be used for lookups against Globus Auth
:type auth_client: :class:`AuthClient <globus_sdk.AuthClient>`
:param identity_ids: A list or other iterable of usernames or identity IDs (potentially
mixed together) which will be used to seed the ``IdentityMap`` 's tracking of
unresolved Identities.
:type identity_ids: iterable of str, optional
:param id_batch_size: A non-default batch size to use when communicating with Globus
Auth. Leaving this set to the default is strongly recommended.
:type id_batch_size: int, optional
:param cache: A dict or other mapping object which will be used to cache results.
The default is that results are cached once per IdentityMap object. If you want
multiple IdentityMaps to share data, explicitly pass the same ``cache`` to both.
:type cache: MutableMapping, optional
.. automethodlist:: globus_sdk.IdentityMap
:include_methods: __getitem__,__delitem__
""" # noqa
_default_id_batch_size = 100
[docs] def __init__(
self,
auth_client: AuthClient,
identity_ids: t.Iterable[str] | None = None,
*,
id_batch_size: int | None = None,
cache: None | (t.MutableMapping[str, dict[str, t.Any]]) = None,
):
self.auth_client = auth_client
self.id_batch_size = id_batch_size or self._default_id_batch_size
# uniquify, copy, and split into IDs vs usernames
self.unresolved_ids, self.unresolved_usernames = split_ids_and_usernames(
[] if identity_ids is None else identity_ids
)
# a cache may be passed in via the constructor in order to make multiple
# IdentityMap objects share a cache
self._cache = cache if cache is not None else {}
def _create_batch(self, key: str) -> set[str]:
"""
Create a batch to do a lookup.
For whichever set of unresolved names is appropriate, build the batch to
lookup up to *at most* the batch size. Also, remove the unresolved names from
tracking so that they will not be looked up again.
"""
key_is_username = is_username(key)
set_to_use = (
self.unresolved_usernames if key_is_username else self.unresolved_ids
)
# start the batch with the key being looked up, and if it is in the unresolved
# list remove it
batch = {key}
if key in set_to_use:
set_to_use.remove(key)
# until we've exhausted the set or filled the batch, keep trying to add
while set_to_use and len(batch) < self.id_batch_size:
value = set_to_use.pop()
# value may already have been looked up if the cache is shared, skip those
if value in self._cache:
continue
batch.add(value)
return batch
def _fetch_batch_including(self, key: str) -> None:
"""
Batch resolve identifiers (usernames or IDs), being sure to include the desired,
named key. The key also determines which kind of batch will be built --
usernames or IDs.
Store the results in the internal cache.
"""
batch = self._create_batch(key)
if is_username(key):
response = self.auth_client.get_identities(usernames=batch)
else:
response = self.auth_client.get_identities(ids=batch)
for x in response["identities"]:
self._cache[x["id"]] = x
self._cache[x["username"]] = x
[docs] def add(self, identity_id: str) -> bool:
"""
Add a username or ID to the ``IdentityMap`` for batch lookups later.
Returns True if the ID was added for lookup.
Returns False if it was rejected as a duplicate of an already known name.
:param identity_id: A string Identity ID or Identity Name (a.k.a. "username") to
add
:type identity_id: str
"""
if identity_id in self._cache:
return False
if is_username(identity_id):
if identity_id in self.unresolved_usernames:
return False
else:
self.unresolved_usernames.add(identity_id)
return True
if identity_id in self.unresolved_ids:
return False
self.unresolved_ids.add(identity_id)
return True
[docs] def get(self, key: str, default: t.Any | None = None) -> t.Any:
"""
A dict-like get() method which accepts a default value.
"""
try:
return self[key]
except KeyError:
return default
[docs] def __getitem__(self, key: str) -> t.Any:
"""
``IdentityMap`` supports dict-like lookups with ``map[key]``
"""
if key not in self._cache:
self._fetch_batch_including(key)
return self._cache[key]
[docs] def __delitem__(self, key: str) -> None:
"""
``IdentityMap`` supports ``del map[key]``. Note that this only removes lookup
values from the cache and will not impact the set of unresolved/pending IDs.
"""
del self._cache[key]