Source code for globus_sdk.paging.next_token
from typing import Any, Callable, Dict, Iterator, List, Optional
from .base import PageT, Paginator
[docs]class NextTokenPaginator(Paginator[PageT]):
"""
A paginator which uses `next_token` from payloads to set the `next_token`
query param to page.
Very similar to GCS's marker paginator, but only used for Transfer's
get_shared_endpoint_list
"""
def __init__(
self,
method: Callable[..., Any],
*,
items_key: Optional[str] = None,
client_args: List[Any],
client_kwargs: Dict[str, Any]
):
super().__init__(
method,
items_key=items_key,
client_args=client_args,
client_kwargs=client_kwargs,
)
self.next_token: Optional[str] = None
[docs] def pages(self) -> Iterator[PageT]:
has_next_page = True
while has_next_page:
if self.next_token:
self.client_kwargs["next_token"] = self.next_token
current_page = self.method(*self.client_args, **self.client_kwargs)
yield current_page
self.next_token = current_page.get("next_token")
has_next_page = current_page.get("next_token") is not None