Skip to content

Commit ec7212b

Browse files
authored
Pagination support for listing workflows through a new Client#list_workflow_page method (#285)
1 parent cc5b393 commit ec7212b

File tree

7 files changed

+111
-31
lines changed

7 files changed

+111
-31
lines changed

temporalio/lib/temporalio/client.rb

Lines changed: 44 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,16 @@ class Client
5050
# Options as returned from {options} for +**to_h+ splat use in {initialize}. See {initialize} for details.
5151
class Options; end # rubocop:disable Lint/EmptyClass
5252

53+
ListWorkflowPage = Data.define(:executions, :next_page_token)
54+
55+
# A page of workflow executions returned by {Client#list_workflow_page}.
56+
#
57+
# @!attribute executions
58+
# @return [Array<WorkflowExecution>] List of workflow executions in this page.
59+
# @!attribute next_page_token
60+
# @return [String, nil] Token for the next page of results. nil if there are no more results.
61+
class ListWorkflowPage; end # rubocop:disable Lint/EmptyClass
62+
5363
# Connect to Temporal server. This is a shortcut for +Connection.new+ followed by +Client.new+.
5464
#
5565
# @param target_host [String] +host:port+ for the Temporal server. For local development, this is often
@@ -486,7 +496,40 @@ def signal_with_start_workflow(
486496
#
487497
# @see https://docs.temporal.io/visibility
488498
def list_workflows(query = nil, rpc_options: nil)
489-
@impl.list_workflows(Interceptor::ListWorkflowsInput.new(query:, rpc_options:))
499+
next_page_token = nil
500+
Enumerator.new do |yielder|
501+
loop do
502+
list_workflow_page_input = Interceptor::ListWorkflowPageInput.new(
503+
query: query,
504+
rpc_options: rpc_options,
505+
next_page_token: next_page_token,
506+
page_size: nil
507+
)
508+
page = @impl.list_workflow_page(list_workflow_page_input)
509+
page.executions.each { |execution| yielder << execution }
510+
next_page_token = page.next_page_token
511+
break if (next_page_token || '').empty?
512+
end
513+
end
514+
end
515+
516+
# List workflows one page at a time.
517+
#
518+
# @param query [String, nil] A Temporal visibility list filter.
519+
# @param page_size [Integer, nil] Maximum number of results to return.
520+
# @param next_page_token [String, nil] Token for the next page of results. If not set, the first page is returned.
521+
# @param rpc_options [RPCOptions, nil] Advanced RPC options.
522+
#
523+
# @return [ListWorkflowPage] Page of workflow executions, along with a next_page_token to keep fetching.
524+
#
525+
# @raise [Error::RPCError] RPC error from call.
526+
#
527+
# @see https://docs.temporal.io/visibility
528+
def list_workflow_page(query = nil, page_size: nil, next_page_token: nil, rpc_options: nil)
529+
@impl.list_workflow_page(Interceptor::ListWorkflowPageInput.new(query:,
530+
next_page_token:,
531+
page_size:,
532+
rpc_options:))
490533
end
491534

492535
# Count workflows.

temporalio/lib/temporalio/client/interceptor.rb

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -62,9 +62,11 @@ def intercept_client(next_interceptor)
6262
:rpc_options
6363
)
6464

65-
# Input for {Outbound.list_workflows}.
66-
ListWorkflowsInput = Data.define(
65+
# Input for {Outbound.list_workflow_page}.
66+
ListWorkflowPageInput = Data.define(
6767
:query,
68+
:next_page_token,
69+
:page_size,
6870
:rpc_options
6971
)
7072

@@ -281,12 +283,12 @@ def signal_with_start_workflow(input)
281283
next_interceptor.signal_with_start_workflow(input)
282284
end
283285

284-
# Called for every {Client.list_workflows} call.
286+
# Called for every {Client.list_workflow_page} call.
285287
#
286-
# @param input [ListWorkflowsInput] Input.
287-
# @return [Enumerator<WorkflowExecution>] Enumerable workflow executions.
288-
def list_workflows(input)
289-
next_interceptor.list_workflows(input)
288+
# @param input [ListWorkflowPageInput] Input.
289+
# @return [Client::ListWorkflowPage] Enumerable workflow executions, with a #next_page_token method.
290+
def list_workflow_page(input)
291+
next_interceptor.list_workflow_page(input)
290292
end
291293

292294
# Called for every {Client.count_workflows} call.

temporalio/lib/temporalio/internal/client/implementation.rb

Lines changed: 17 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -326,25 +326,24 @@ def _start_workflow_request_from_with_start_options(klass, start_options)
326326
)
327327
end
328328

329-
def list_workflows(input)
330-
Enumerator.new do |yielder|
331-
req = Api::WorkflowService::V1::ListWorkflowExecutionsRequest.new(
332-
namespace: @client.namespace,
333-
query: input.query || ''
334-
)
335-
loop do
336-
resp = @client.workflow_service.list_workflow_executions(
337-
req,
338-
rpc_options: Implementation.with_default_rpc_options(input.rpc_options)
339-
)
340-
resp.executions.each do |raw_info|
341-
yielder << Temporalio::Client::WorkflowExecution.new(raw_info, @client.data_converter)
342-
end
343-
break if resp.next_page_token.empty?
344-
345-
req.next_page_token = resp.next_page_token
346-
end
329+
def list_workflow_page(input)
330+
req = Api::WorkflowService::V1::ListWorkflowExecutionsRequest.new(
331+
namespace: @client.namespace,
332+
query: input.query || '',
333+
next_page_token: input.next_page_token,
334+
page_size: input.page_size
335+
)
336+
resp = @client.workflow_service.list_workflow_executions(
337+
req,
338+
rpc_options: Implementation.with_default_rpc_options(input.rpc_options)
339+
)
340+
executions = resp.executions.map do |raw_info|
341+
Temporalio::Client::WorkflowExecution.new(raw_info, @client.data_converter)
347342
end
343+
Temporalio::Client::ListWorkflowPage.new(
344+
executions: executions,
345+
next_page_token: resp.next_page_token
346+
)
348347
end
349348

350349
def count_workflows(input)

temporalio/sig/temporalio/client.rbs

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,16 @@ module Temporalio
2222
def with: (**untyped) -> Options
2323
end
2424

25+
class ListWorkflowPage
26+
attr_reader executions: Array[WorkflowExecution]
27+
attr_reader next_page_token: String?
28+
29+
def initialize: (
30+
executions: Array[WorkflowExecution],
31+
next_page_token: String?
32+
) -> void
33+
end
34+
2535
def self.connect: (
2636
String target_host,
2737
String namespace,
@@ -139,6 +149,13 @@ module Temporalio
139149
?rpc_options: RPCOptions?
140150
) -> Enumerator[WorkflowExecution, WorkflowExecution]
141151

152+
def list_workflow_page: (
153+
?String query,
154+
?next_page_token: String?,
155+
?page_size: Integer?,
156+
?rpc_options: RPCOptions?
157+
) -> ListWorkflowPage
158+
142159
def count_workflows: (
143160
?String query,
144161
?rpc_options: RPCOptions?

temporalio/sig/temporalio/client/interceptor.rbs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -85,12 +85,16 @@ module Temporalio
8585
) -> void
8686
end
8787

88-
class ListWorkflowsInput
88+
class ListWorkflowPageInput
8989
attr_reader query: String?
90+
attr_reader next_page_token: String?
91+
attr_reader page_size: Integer?
9092
attr_reader rpc_options: RPCOptions?
9193

9294
def initialize: (
9395
query: String?,
96+
next_page_token: String?,
97+
page_size: Integer?,
9498
rpc_options: RPCOptions?
9599
) -> void
96100
end
@@ -412,7 +416,7 @@ module Temporalio
412416

413417
def signal_with_start_workflow: (SignalWithStartWorkflowInput input) -> WorkflowHandle
414418

415-
def list_workflows: (ListWorkflowsInput input) -> Enumerator[WorkflowExecution, WorkflowExecution]
419+
def list_workflow_page: (ListWorkflowPageInput input) -> Client::ListWorkflowPage
416420

417421
def count_workflows: (CountWorkflowsInput input) -> WorkflowExecutionCount
418422

temporalio/test/client_test.rb

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -50,8 +50,8 @@ def start_workflow(input)
5050
super
5151
end
5252

53-
def list_workflows(input)
54-
@root.calls.push(['list_workflows', input])
53+
def list_workflow_page(input)
54+
@root.calls.push(['list_workflow_page', input])
5555
super
5656
end
5757

@@ -141,7 +141,7 @@ def test_interceptor
141141
track.calls.clear
142142
assert_empty client.list_workflows("WorkflowType = 'test-interceptor-does-not-exist'").to_a
143143
assert_equal 0, client.count_workflows("WorkflowType = 'test-interceptor-does-not-exist'").count
144-
assert_equal(%w[list_workflows count_workflows], track.calls.map(&:first))
144+
assert_equal(%w[list_workflow_page count_workflows], track.calls.map(&:first))
145145
end
146146
end
147147
end

temporalio/test/client_workflow_test.rb

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -260,6 +260,21 @@ def test_list_and_count
260260
assert_equal 2, wfs.size
261261
end
262262

263+
# Paginated list workflows query
264+
assert_eventually do
265+
wfs = env.client.list_workflow_page("`#{ATTR_KEY_TEXT.name}` = '#{text_val}'", page_size: 2)
266+
assert_equal 2, wfs.executions.size
267+
all_wfs = wfs.executions
268+
# Check next page
269+
next_page = wfs.next_page_token
270+
wfs = env.client.list_workflow_page("`#{ATTR_KEY_TEXT.name}` = '#{text_val}'",
271+
next_page_token: next_page)
272+
assert_equal 3, wfs.executions.size
273+
all_wfs += wfs.executions
274+
# Check all IDs are present
275+
assert_equal handles.map(&:id).sort, all_wfs.map(&:id).sort
276+
end
277+
263278
# Normal count
264279
assert_eventually do
265280
count = env.client.count_workflows("`#{ATTR_KEY_TEXT.name}` = '#{text_val}'")

0 commit comments

Comments
 (0)