Source code for globus_sdk.exc.err_info

from __future__ import annotations

import logging
import typing as t

from globus_sdk import _guards

log = logging.getLogger(__name__)


[docs] class ErrorInfo: """ Errors may contain "containers" of data which are testable (define ``__bool__``). When they have data, they should ``bool()`` as ``True`` """ _has_data: bool def __bool__(self) -> bool: return self._has_data def __str__(self) -> str: if self: attrmap = ", ".join( [f"{k}={v}" for k, v in self.__dict__.items() if not k.startswith("_")] ) else: attrmap = ":" return f"{self.__class__.__name__}({attrmap})"
[docs] class AuthorizationParameterInfo(ErrorInfo): """ AuthorizationParameterInfo objects may contain information about the 'authorization_parameters' of an error. They test as truthy when the error has valid 'authorization_parameters' data. :ivar session_message: A message from the server :vartype session_message: str, optional :ivar session_required_identities: A list of identity IDs as strings which are being requested by the server :vartype session_required_identities: list of str, optional :ivar session_required_single_domain: A list of domains which are being requested by the server ("single domain" because the user should choose one) :vartype session_required_single_domain: list of str, optional :ivar session_required_policies: A list of policies required for the session. :vartype session_required_policies: list of str, optional :ivar session_required_mfa: Whether MFA is required for the session. :vartype session_required_mfa: bool, optional **Examples** >>> try: >>> ... # something >>> except GlobusAPIError as err: >>> # get a parsed AuthorizationParameterInfo object, and check if it's truthy >>> authz_params = err.info.authorization_parameters >>> if not authz_params: >>> raise >>> # whatever handling code is desired... >>> print("got authz params:", authz_params) """ def __init__(self, error_data: dict[str, t.Any]): # data is there if this key is present and it is a dict self._has_data = isinstance(error_data.get("authorization_parameters"), dict) data = t.cast( t.Dict[str, t.Any], error_data.get("authorization_parameters", {}) ) self.session_message: str | None = self._parse_session_message(data) self.session_required_identities: list[str] | None = ( self._parse_session_required_identities(data) ) self.session_required_single_domain: list[str] | None = ( self._parse_session_required_single_domain(data) ) self.session_required_policies: list[str] | None = ( self._parse_session_required_policies(data) ) self.session_required_mfa = self._parse_session_required_mfa(data) def _parse_session_message(self, data: dict[str, t.Any]) -> str | None: session_message = data.get("session_message") if isinstance(session_message, str): return session_message elif session_message is not None: self._warn_type("session_message", "str", session_message) return None def _parse_session_required_identities( self, data: dict[str, t.Any] ) -> list[str] | None: session_required_identities = data.get("session_required_identities") if _guards.is_list_of(session_required_identities, str): return session_required_identities elif session_required_identities is not None: self._warn_type( "session_required_identities", "list[str]", session_required_identities, ) return None def _parse_session_required_single_domain( self, data: dict[str, t.Any] ) -> list[str] | None: session_required_single_domain = data.get("session_required_single_domain") if _guards.is_list_of(session_required_single_domain, str): return session_required_single_domain elif session_required_single_domain is not None: self._warn_type( "session_required_single_domain", "list[str]", session_required_single_domain, ) return None def _parse_session_required_policies( self, data: dict[str, t.Any] ) -> list[str] | None: session_required_policies = data.get("session_required_policies") if isinstance(session_required_policies, str): return session_required_policies.split(",") elif _guards.is_list_of(session_required_policies, str): return session_required_policies elif session_required_policies is not None: self._warn_type( "session_required_policies", "list[str]|str", session_required_policies ) return None def _parse_session_required_mfa(self, data: dict[str, t.Any]) -> bool | None: session_required_mfa = data.get("session_required_mfa") if isinstance(session_required_mfa, bool): return session_required_mfa elif session_required_mfa is not None: self._warn_type("session_required_mfa", "bool", session_required_mfa) return None def _warn_type(self, key: str, expected: str, got: t.Any) -> None: log.warning( f"During ErrorInfo instantiation, got unexpected type for '{key}'. " f"Expected '{expected}'. Got '{got}'" )
[docs] class ConsentRequiredInfo(ErrorInfo): """ ConsentRequiredInfo objects contain required consent information for an error. They test as truthy if the error was marked as a ConsentRequired error. :ivar required_scopes: A list of scopes requested by the server :vartype required_scopes: list of str, optional """ def __init__(self, error_data: dict[str, t.Any]) -> None: # data is only considered parseable if this error has the code 'ConsentRequired' has_code = error_data.get("code") == "ConsentRequired" data = error_data if has_code else {} self.required_scopes = self._parse_required_scopes(data) # but the result is only considered valid if both parts are present self._has_data = has_code and bool(self.required_scopes) def _parse_required_scopes(self, data: dict[str, t.Any]) -> list[str]: if _guards.is_list_of(data.get("required_scopes"), str): return t.cast("list[str]", data["required_scopes"]) elif isinstance(data.get("required_scope"), str): return [data["required_scope"]] return []
[docs] class ErrorInfoContainer: """ This is a wrapper type which contains various error info objects for parsed error data. It is attached to API errors as the ``.info`` attribute. :ivar authorization_parameters: A parsed AuthorizationParameterInfo object :ivar consent_required: A parsed ConsentRequiredInfo object """ def __init__(self, error_data: dict[str, t.Any] | None) -> None: self.authorization_parameters = AuthorizationParameterInfo(error_data or {}) self.consent_required = ConsentRequiredInfo(error_data or {}) def __str__(self) -> str: return f"{self.authorization_parameters}|{self.consent_required}"