Source code for globus_sdk.paging.next_token

from typing import Any, Callable, Dict, Iterator, List, Optional

from globus_sdk.response import GlobusHTTPResponse

from .base import Paginator


[docs]class NextTokenPaginator(Paginator): """ A paginator which uses `next_token` from payloads to set the `next_token` query param to page. Very similar to GCS's marker paginator, but only used for Transfer's get_shared_endpoint_list """ def __init__( self, method: Callable, *, items_key: Optional[str] = None, client_args: List[Any], client_kwargs: Dict[str, Any] ): super().__init__( method, items_key=items_key, client_args=client_args, client_kwargs=client_kwargs, ) self.next_token: Optional[str] = None
[docs] def pages(self) -> Iterator[GlobusHTTPResponse]: has_next_page = True while has_next_page: if self.next_token: self.client_kwargs["next_token"] = self.next_token current_page = self.method(*self.client_args, **self.client_kwargs) yield current_page self.next_token = current_page.get("next_token") has_next_page = current_page.get("next_token") is not None