Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions temporalio/lib/temporalio/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -489,6 +489,25 @@ def list_workflows(query = nil, rpc_options: nil)
@impl.list_workflows(Interceptor::ListWorkflowsInput.new(query:, rpc_options:))
end

# List workflows one page at a time.
#
# @param query [String, nil] A Temporal visibility list filter.
# @param next_page_token [String, nil] Token for the next page of results. If not set, the first page is returned.
# @param page_size [Integer, nil] Maximum number of results to return.
# @param rpc_options [RPCOptions, nil] Advanced RPC options.
#
# @return [Interceptor::ListWorkflowPageOutput] Enumerable workflow executions, with a #next_page_token method.
#
# @raise [Error::RPCError] RPC error from call.
#
# @see https://docs.temporal.io/visibility
def list_workflow_page(query = nil, next_page_token: nil, page_size: nil, rpc_options: nil)
@impl.list_workflow_page(Interceptor::ListWorkflowPageInput.new(query:,
next_page_token:,
page_size:,
rpc_options:))
end

# Count workflows.
#
# @param query [String, nil] A Temporal visibility list filter.
Expand Down
25 changes: 25 additions & 0 deletions temporalio/lib/temporalio/client/interceptor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,23 @@ def intercept_client(next_interceptor)
:rpc_options
)

# Input for {Outbound.list_workflows}.
ListWorkflowPageInput = Data.define(
:query,
:next_page_token,
:page_size,
:rpc_options
)

# Output for {Outbound.list_workflow_page}.
ListWorkflowPageOutput = Data.define(:executions, :next_page_token) do
include Enumerable

def each(...) # rubocop:disable Style/DocumentationMethod
executions.each(...)
end
end

# Input for {Outbound.count_workflows}.
CountWorkflowsInput = Data.define(
:query,
Expand Down Expand Up @@ -289,6 +306,14 @@ def list_workflows(input)
next_interceptor.list_workflows(input)
end

# Called for every {Client.list_workflow_page} call.
#
# @param input [ListWorkflowPageInput] Input.
# @return [Interceptor::ListWorkflowPageOutput] Enumerable workflow executions, with a #next_page_token method.
def list_workflow_page(input)
next_interceptor.list_workflow_page(input)
end

# Called for every {Client.count_workflows} call.
#
# @param input [CountWorkflowsInput] Input.
Expand Down
45 changes: 31 additions & 14 deletions temporalio/lib/temporalio/internal/client/implementation.rb
Original file line number Diff line number Diff line change
Expand Up @@ -327,26 +327,43 @@ def _start_workflow_request_from_with_start_options(klass, start_options)
end

def list_workflows(input)
next_page_token = nil
list_workflow_page_input = Temporalio::Client::Interceptor::ListWorkflowPageInput.new(
query: input.query,
rpc_options: input.rpc_options,
next_page_token: next_page_token,
page_size: nil
)
Enumerator.new do |yielder|
req = Api::WorkflowService::V1::ListWorkflowExecutionsRequest.new(
namespace: @client.namespace,
query: input.query || ''
)
loop do
resp = @client.workflow_service.list_workflow_executions(
req,
rpc_options: Implementation.with_default_rpc_options(input.rpc_options)
)
resp.executions.each do |raw_info|
yielder << Temporalio::Client::WorkflowExecution.new(raw_info, @client.data_converter)
end
break if resp.next_page_token.empty?

req.next_page_token = resp.next_page_token
page = @client._impl.list_workflow_page(list_workflow_page_input.with(next_page_token: next_page_token))
page.each { |execution| yielder << execution }
next_page_token = page.next_page_token
break if next_page_token.empty?
end
end
end

def list_workflow_page(input)
req = Api::WorkflowService::V1::ListWorkflowExecutionsRequest.new(
namespace: @client.namespace,
query: input.query || '',
next_page_token: input.next_page_token,
page_size: input.page_size
)
resp = @client.workflow_service.list_workflow_executions(
req,
rpc_options: Implementation.with_default_rpc_options(input.rpc_options)
)
executions = resp.executions.map do |raw_info|
Temporalio::Client::WorkflowExecution.new(raw_info, @client.data_converter)
end
Temporalio::Client::Interceptor::ListWorkflowPageOutput.new(
executions: executions,
next_page_token: resp.next_page_token
)
end

def count_workflows(input)
resp = @client.workflow_service.count_workflow_executions(
Api::WorkflowService::V1::CountWorkflowExecutionsRequest.new(
Expand Down
7 changes: 7 additions & 0 deletions temporalio/sig/temporalio/client.rbs
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,13 @@ module Temporalio
?rpc_options: RPCOptions?
) -> Enumerator[WorkflowExecution, WorkflowExecution]

def list_workflow_page: (
?String query,
?next_page_token: String?,
?page_size: Integer?,
?rpc_options: RPCOptions?
) -> Interceptor::ListWorkflowPageOutput

def count_workflows: (
?String query,
?rpc_options: RPCOptions?
Expand Down
26 changes: 26 additions & 0 deletions temporalio/sig/temporalio/client/interceptor.rbs
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,32 @@ module Temporalio
) -> void
end

class ListWorkflowPageInput
attr_reader query: String?
attr_reader next_page_token: String?
attr_reader page_size: Integer?
attr_reader rpc_options: RPCOptions?

def initialize: (
query: String?,
next_page_token: String?,
page_size: Integer?,
rpc_options: RPCOptions?
) -> void
end

class ListWorkflowPageOutput
include Enumerable[WorkflowExecution, WorkflowExecution]

attr_reader executions: Array[WorkflowExecution]
attr_reader next_page_token: String?

def initialize: (
executions: Array[WorkflowExecution],
next_page_token: String?
) -> void
end

class CountWorkflowsInput
attr_reader query: String?
attr_reader rpc_options: RPCOptions?
Expand Down
7 changes: 6 additions & 1 deletion temporalio/test/client_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,11 @@ def list_workflows(input)
super
end

def list_workflow_page(input)
@root.calls.push(['list_workflow_page', input])
super
end

def count_workflows(input)
@root.calls.push(['count_workflows', input])
super
Expand Down Expand Up @@ -141,7 +146,7 @@ def test_interceptor
track.calls.clear
assert_empty client.list_workflows("WorkflowType = 'test-interceptor-does-not-exist'").to_a
assert_equal 0, client.count_workflows("WorkflowType = 'test-interceptor-does-not-exist'").count
assert_equal(%w[list_workflows count_workflows], track.calls.map(&:first))
assert_equal(%w[list_workflows list_workflow_page count_workflows], track.calls.map(&:first))
end
end
end
15 changes: 15 additions & 0 deletions temporalio/test/client_workflow_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,21 @@ def test_list_and_count
assert_equal 2, wfs.size
end

# Paginated list workflows query
assert_eventually do
wfs = env.client.list_workflow_page("`#{ATTR_KEY_TEXT.name}` = '#{text_val}'", page_size: 2)
assert_equal 2, wfs.count
all_wfs = wfs.to_a
# Check next page
next_page = wfs.next_page_token
wfs = env.client.list_workflow_page("`#{ATTR_KEY_TEXT.name}` = '#{text_val}'",
next_page_token: next_page).to_a
assert_equal 3, wfs.count
all_wfs += wfs.to_a
# Check all IDs are present
assert_equal handles.map(&:id).sort, all_wfs.map(&:id).sort
end

# Normal count
assert_eventually do
count = env.client.count_workflows("`#{ATTR_KEY_TEXT.name}` = '#{text_val}'")
Expand Down
Loading