Source code for globus_sdk.paging.next_token

from __future__ import annotations

import typing as t

from .base import PageT, Paginator


[docs] class NextTokenPaginator(Paginator[PageT]): """ A paginator which uses `next_token` from payloads to set the `next_token` query param to page. Very similar to GCS's marker paginator, but only used for Transfer's get_shared_endpoint_list """ _REQUIRES_METHOD_KWARGS = ("next_token",) def __init__( self, method: t.Callable[..., t.Any], *, items_key: str | None = None, client_args: tuple[t.Any, ...], client_kwargs: dict[str, t.Any], ): super().__init__( method, items_key=items_key, client_args=client_args, client_kwargs=client_kwargs, ) self.next_token: str | None = None
[docs] def pages(self) -> t.Iterator[PageT]: has_next_page = True while has_next_page: if self.next_token: self.client_kwargs["next_token"] = self.next_token current_page = self.method(*self.client_args, **self.client_kwargs) yield current_page self.next_token = current_page.get("next_token") has_next_page = current_page.get("next_token") is not None