Source code for globus_sdk.token_storage.validating_token_storage.storage

from __future__ import annotations

import typing as t

import globus_sdk

from ..base import TokenStorage
from ..token_data import TokenStorageData
from .context import TokenValidationContext
from .errors import MissingTokenError
from .validators import TokenDataValidator


[docs] class ValidatingTokenStorage(TokenStorage): """ A special token storage which provides token data validation hooks. See :class:`TokenDataValidator` for details on hook specifics. See :class:`TokenStorage` for common interface details. :param token_storage: A proxy token storage for this class to pass through store, get, and remove requests to. :param validators: A collection of validation hooks to call. :ivar str | None identity_id: The primary identity ID of the entity which granted tokens, if known. """ def __init__( self, token_storage: TokenStorage, *, validators: t.Iterable[TokenDataValidator] = (), ) -> None: self.token_storage = token_storage self.validators: list[TokenDataValidator] = list(validators) self.identity_id = _identity_id_from_token_data( token_storage.get_token_data_by_resource_server() ) super().__init__(namespace=token_storage.namespace) def close(self) -> None: """Closing a validating storage closes the storage it contains.""" self.token_storage.close() def _make_context( self, token_data_by_resource_server: t.Mapping[str, TokenStorageData] ) -> TokenValidationContext: """ Build a TokenValidationContext object and potentially update the stored ``identity_id`` information in this ValidatingTokenStorage. Importantly, this records ``self.identity_id`` into the ``prior_identity_id`` slot before applying this update. """ context = TokenValidationContext( prior_identity_id=self.identity_id, token_data_identity_id=_identity_id_from_token_data( token_data_by_resource_server ), ) if self.identity_id is None: self.identity_id = context.token_data_identity_id return context def store_token_data_by_resource_server( self, token_data_by_resource_server: t.Mapping[str, TokenStorageData] ) -> None: """ :param token_data_by_resource_server: A dict of TokenStorageData objects indexed by their resource server """ context = self._make_context(token_data_by_resource_server) for validator in self.validators: validator.before_store(token_data_by_resource_server, context) self.token_storage.store_token_data_by_resource_server( token_data_by_resource_server ) def get_token_data(self, resource_server: str) -> TokenStorageData: """ :param resource_server: A resource server with cached token data. :returns: The token data for the given resource server. :raises: :exc:`MissingTokenError` if the underlying ``TokenStorage`` does not have any token data for the given resource server. """ token_data = self.token_storage.get_token_data(resource_server) if token_data is None: msg = f"No token data for {resource_server}" raise MissingTokenError(msg, resource_server=resource_server) token_data_by_resource_server = {token_data.resource_server: token_data} context = self._make_context(token_data_by_resource_server) for validator in self.validators: validator.after_retrieve(token_data_by_resource_server, context) return token_data def get_token_data_by_resource_server(self) -> dict[str, TokenStorageData]: token_data_by_resource_server = ( self.token_storage.get_token_data_by_resource_server() ) context = self._make_context(token_data_by_resource_server) for validator in self.validators: validator.after_retrieve(token_data_by_resource_server, context) return token_data_by_resource_server def remove_token_data(self, resource_server: str) -> bool: """ :param resource_server: The resource server string to remove token data for """ return self.token_storage.remove_token_data(resource_server) def _extract_identity_id( self, token_response: globus_sdk.OAuthTokenResponse ) -> str | None: """ Override determination of the identity_id for a token response. When handling a refresh token, use the stored identity ID if possible. Otherwise, call the inner token storage's method of lookup. """ if isinstance(token_response, globus_sdk.OAuthRefreshTokenResponse): return self.identity_id else: return self.token_storage._extract_identity_id(token_response)
def _identity_id_from_token_data( token_data_by_resource_server: t.Mapping[str, TokenStorageData], ) -> str | None: """ Read token data by resource server and return the ``identity_id`` value which was produced. :param token_data_by_resource_server: The token data to read for identity_id. :raises ValueError: if there is inconsistent ``identity_id`` information """ token_data_identity_ids: set[str] = { token_data.identity_id for token_data in token_data_by_resource_server.values() if token_data.identity_id is not None } if len(token_data_identity_ids) == 0: return None elif len(token_data_identity_ids) == 1: return token_data_identity_ids.pop() else: raise ValueError( "token_data_by_resource_server contained TokenStorageData objects with " f"different identity_id values: {token_data_identity_ids}" )