from__future__importannotationsimporttypingastimportrequestsfrom.baseimportGlobusError# Wrappers around requests exceptions, so the SDK is somewhat independent from details# about requests
[docs]classNetworkError(GlobusError):""" Error communicating with the REST API server. Holds onto original exception data, but also takes a message to explain potentially confusing or inconsistent exceptions passed to us """def__init__(self,msg:str,exc:Exception,*args:t.Any,**kwargs:t.Any)->None:super().__init__(msg)self.underlying_exception=exc
[docs]classGlobusConnectionTimeoutError(GlobusTimeoutError):"""The request timed out during connection establishment. These errors are safe to retry."""
[docs]classGlobusConnectionError(NetworkError):"""A connection error occurred while making a REST request."""
defconvert_request_exception(exc:requests.RequestException)->GlobusError:""" Converts incoming requests.Exception to a Globus NetworkError :param exc: The exception to "convert" by wrapping it """ifisinstance(exc,requests.ConnectTimeout):returnGlobusConnectionTimeoutError("ConnectTimeoutError on request",exc)ifisinstance(exc,requests.Timeout):returnGlobusTimeoutError("TimeoutError on request",exc)elifisinstance(exc,requests.ConnectionError):returnGlobusConnectionError("ConnectionError on request",exc)else:returnNetworkError("NetworkError on request",exc)