Skip to content

Add limit to list workflows #698

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Dec 6, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 17 additions & 1 deletion temporalio/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -792,6 +792,7 @@ def list_workflows(
self,
query: Optional[str] = None,
*,
limit: Optional[int] = None,
page_size: int = 1000,
next_page_token: Optional[bytes] = None,
rpc_metadata: Mapping[str, str] = {},
Expand All @@ -806,6 +807,10 @@ def list_workflows(
query: A Temporal visibility list filter. See Temporal documentation
concerning visibility list filters including behavior when left
unset.
limit: Maximum number of workflows to return. If unset, all
workflows are returned. Only applies if using the
returned :py:class:`WorkflowExecutionAsyncIterator`.
as an async iterator.
page_size: Maximum number of results for each page.
next_page_token: A previously obtained next page token if doing
pagination. Usually not needed as the iterator automatically
Expand All @@ -824,6 +829,7 @@ def list_workflows(
next_page_token=next_page_token,
rpc_metadata=rpc_metadata,
rpc_timeout=rpc_timeout,
limit=limit,
)
)

Expand Down Expand Up @@ -2483,6 +2489,8 @@ def __init__(
self._next_page_token = input.next_page_token
self._current_page: Optional[Sequence[WorkflowExecution]] = None
self._current_page_index = 0
self._limit = input.limit
self._yielded = 0

@property
def current_page_index(self) -> int:
Expand All @@ -2508,10 +2516,14 @@ async def fetch_next_page(self, *, page_size: Optional[int] = None) -> None:
page_size: Override the page size this iterator was originally
created with.
"""
page_size = page_size or self._input.page_size
if self._limit is not None and self._limit - self._yielded < page_size:
Copy link
Member

@cretz cretz Dec 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason that we made the fetch_next_page public is because technically people don't have to use the async iteration, they can paginate using this themselves. For users using it that way, limit will not work. I am unsure if we want to make it work or if we want to say it's only when using it as an async iterable that it applies.

One option is to instead offer a general Python helper for limiting async iterables (though doesn't help the page size thing). But we can also just document that this field only works when using as an async iterable.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, honestly I'd prefer to just un-pub it but I suppose that's not really an option. I'll update the docstring to say it only applies to the iterable, since if you're doing it yourself ostensibly you don't care anyway.

Copy link
Member

@cretz cretz Dec 6, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Works for me. The reason behind making it public is I was afraid people wanted to use it outside of an async for and were afraid to invoke __anext__ themselves and understand how to handle iteration stop and such. Async iterables in Python are not as easy to use outside of loop constructs as, say, Java streams or .NET async enumerables.

page_size = self._limit - self._yielded

resp = await self._client.workflow_service.list_workflow_executions(
temporalio.api.workflowservice.v1.ListWorkflowExecutionsRequest(
namespace=self._client.namespace,
page_size=page_size or self._input.page_size,
page_size=page_size,
next_page_token=self._next_page_token or b"",
query=self._input.query or "",
),
Expand All @@ -2534,6 +2546,8 @@ async def __anext__(self) -> WorkflowExecution:
"""Get the next execution on this iterator, fetching next page if
necessary.
"""
if self._limit is not None and self._yielded >= self._limit:
raise StopAsyncIteration
while True:
# No page? fetch and continue
if self._current_page is None:
Expand All @@ -2551,6 +2565,7 @@ async def __anext__(self) -> WorkflowExecution:
# Get current, increment page index, and return
ret = self._current_page[self._current_page_index]
self._current_page_index += 1
self._yielded += 1
return ret

async def map_histories(
Expand Down Expand Up @@ -4573,6 +4588,7 @@ class ListWorkflowsInput:
next_page_token: Optional[bytes]
rpc_metadata: Mapping[str, str]
rpc_timeout: Optional[timedelta]
limit: Optional[int]


@dataclass
Expand Down
14 changes: 14 additions & 0 deletions tests/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -618,6 +618,20 @@ async def test_list_workflows_and_fetch_history(
)
assert actual_id_and_input == expected_id_and_input

# Verify listing can limit results
limited = [
w async for w in client.list_workflows(f"WorkflowId = '{workflow_id}'", limit=3)
]
assert len(limited) == 3
# With a weird page size
limited = [
w
async for w in client.list_workflows(
f"WorkflowId = '{workflow_id}'", page_size=2, limit=3
)
]
assert len(limited) == 3


@workflow.defn
class CountableWorkflow:
Expand Down
Loading