Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 44 additions & 1 deletion temporalio/lib/temporalio/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,16 @@ class Client
# Options as returned from {options} for +**to_h+ splat use in {initialize}. See {initialize} for details.
class Options; end # rubocop:disable Lint/EmptyClass

ListWorkflowPage = Data.define(:executions, :next_page_token)

# A page of workflow executions returned by {Client#list_workflow_page}.
#
# @!attribute executions
# @return [Array<WorkflowExecution>] List of workflow executions in this page.
# @!attribute next_page_token
# @return [String, nil] Token for the next page of results. nil if there are no more results.
class ListWorkflowPage; end # rubocop:disable Lint/EmptyClass

# Connect to Temporal server. This is a shortcut for +Connection.new+ followed by +Client.new+.
#
# @param target_host [String] +host:port+ for the Temporal server. For local development, this is often
Expand Down Expand Up @@ -486,7 +496,40 @@ def signal_with_start_workflow(
#
# @see https://docs.temporal.io/visibility
def list_workflows(query = nil, rpc_options: nil)
@impl.list_workflows(Interceptor::ListWorkflowsInput.new(query:, rpc_options:))
next_page_token = nil
Enumerator.new do |yielder|
loop do
list_workflow_page_input = Interceptor::ListWorkflowPageInput.new(
query: query,
rpc_options: rpc_options,
next_page_token: next_page_token,
page_size: nil
)
page = @impl.list_workflow_page(list_workflow_page_input)
page.executions.each { |execution| yielder << execution }
next_page_token = page.next_page_token
break if (next_page_token || '').empty?
end
end
end

# List workflows one page at a time.
#
# @param query [String, nil] A Temporal visibility list filter.
# @param page_size [Integer, nil] Maximum number of results to return.
# @param next_page_token [String, nil] Token for the next page of results. If not set, the first page is returned.
# @param rpc_options [RPCOptions, nil] Advanced RPC options.
#
# @return [ListWorkflowPage] Page of workflow executions, along with a next_page_token to keep fetching.
#
# @raise [Error::RPCError] RPC error from call.
#
# @see https://docs.temporal.io/visibility
def list_workflow_page(query = nil, page_size: nil, next_page_token: nil, rpc_options: nil)
@impl.list_workflow_page(Interceptor::ListWorkflowPageInput.new(query:,
next_page_token:,
page_size:,
rpc_options:))
end

# Count workflows.
Expand Down
16 changes: 9 additions & 7 deletions temporalio/lib/temporalio/client/interceptor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,11 @@ def intercept_client(next_interceptor)
:rpc_options
)

# Input for {Outbound.list_workflows}.
ListWorkflowsInput = Data.define(
# Input for {Outbound.list_workflow_page}.
ListWorkflowPageInput = Data.define(
:query,
:next_page_token,
:page_size,
:rpc_options
)

Expand Down Expand Up @@ -281,12 +283,12 @@ def signal_with_start_workflow(input)
next_interceptor.signal_with_start_workflow(input)
end

# Called for every {Client.list_workflows} call.
# Called for every {Client.list_workflow_page} call.
#
# @param input [ListWorkflowsInput] Input.
# @return [Enumerator<WorkflowExecution>] Enumerable workflow executions.
def list_workflows(input)
next_interceptor.list_workflows(input)
# @param input [ListWorkflowPageInput] Input.
# @return [Client::ListWorkflowPage] Enumerable workflow executions, with a #next_page_token method.
def list_workflow_page(input)
next_interceptor.list_workflow_page(input)
end

# Called for every {Client.count_workflows} call.
Expand Down
35 changes: 17 additions & 18 deletions temporalio/lib/temporalio/internal/client/implementation.rb
Original file line number Diff line number Diff line change
Expand Up @@ -326,25 +326,24 @@ def _start_workflow_request_from_with_start_options(klass, start_options)
)
end

def list_workflows(input)
Enumerator.new do |yielder|
req = Api::WorkflowService::V1::ListWorkflowExecutionsRequest.new(
namespace: @client.namespace,
query: input.query || ''
)
loop do
resp = @client.workflow_service.list_workflow_executions(
req,
rpc_options: Implementation.with_default_rpc_options(input.rpc_options)
)
resp.executions.each do |raw_info|
yielder << Temporalio::Client::WorkflowExecution.new(raw_info, @client.data_converter)
end
break if resp.next_page_token.empty?

req.next_page_token = resp.next_page_token
end
def list_workflow_page(input)
req = Api::WorkflowService::V1::ListWorkflowExecutionsRequest.new(
namespace: @client.namespace,
query: input.query || '',
next_page_token: input.next_page_token,
page_size: input.page_size
)
resp = @client.workflow_service.list_workflow_executions(
req,
rpc_options: Implementation.with_default_rpc_options(input.rpc_options)
)
executions = resp.executions.map do |raw_info|
Temporalio::Client::WorkflowExecution.new(raw_info, @client.data_converter)
end
Temporalio::Client::ListWorkflowPage.new(
executions: executions,
next_page_token: resp.next_page_token
)
end

def count_workflows(input)
Expand Down
17 changes: 17 additions & 0 deletions temporalio/sig/temporalio/client.rbs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,16 @@ module Temporalio
def with: (**untyped) -> Options
end

class ListWorkflowPage
attr_reader executions: Array[WorkflowExecution]
attr_reader next_page_token: String?

def initialize: (
executions: Array[WorkflowExecution],
next_page_token: String?
) -> void
end

def self.connect: (
String target_host,
String namespace,
Expand Down Expand Up @@ -139,6 +149,13 @@ module Temporalio
?rpc_options: RPCOptions?
) -> Enumerator[WorkflowExecution, WorkflowExecution]

def list_workflow_page: (
?String query,
?next_page_token: String?,
?page_size: Integer?,
?rpc_options: RPCOptions?
) -> ListWorkflowPage

def count_workflows: (
?String query,
?rpc_options: RPCOptions?
Expand Down
8 changes: 6 additions & 2 deletions temporalio/sig/temporalio/client/interceptor.rbs
Original file line number Diff line number Diff line change
Expand Up @@ -85,12 +85,16 @@ module Temporalio
) -> void
end

class ListWorkflowsInput
class ListWorkflowPageInput
attr_reader query: String?
attr_reader next_page_token: String?
attr_reader page_size: Integer?
attr_reader rpc_options: RPCOptions?

def initialize: (
query: String?,
next_page_token: String?,
page_size: Integer?,
rpc_options: RPCOptions?
) -> void
end
Expand Down Expand Up @@ -412,7 +416,7 @@ module Temporalio

def signal_with_start_workflow: (SignalWithStartWorkflowInput input) -> WorkflowHandle

def list_workflows: (ListWorkflowsInput input) -> Enumerator[WorkflowExecution, WorkflowExecution]
def list_workflow_page: (ListWorkflowPageInput input) -> Client::ListWorkflowPage

def count_workflows: (CountWorkflowsInput input) -> WorkflowExecutionCount

Expand Down
6 changes: 3 additions & 3 deletions temporalio/test/client_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ def start_workflow(input)
super
end

def list_workflows(input)
@root.calls.push(['list_workflows', input])
def list_workflow_page(input)
@root.calls.push(['list_workflow_page', input])
super
end

Expand Down Expand Up @@ -141,7 +141,7 @@ def test_interceptor
track.calls.clear
assert_empty client.list_workflows("WorkflowType = 'test-interceptor-does-not-exist'").to_a
assert_equal 0, client.count_workflows("WorkflowType = 'test-interceptor-does-not-exist'").count
assert_equal(%w[list_workflows count_workflows], track.calls.map(&:first))
assert_equal(%w[list_workflow_page count_workflows], track.calls.map(&:first))
end
end
end
15 changes: 15 additions & 0 deletions temporalio/test/client_workflow_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,21 @@ def test_list_and_count
assert_equal 2, wfs.size
end

# Paginated list workflows query
assert_eventually do
wfs = env.client.list_workflow_page("`#{ATTR_KEY_TEXT.name}` = '#{text_val}'", page_size: 2)
assert_equal 2, wfs.executions.size
all_wfs = wfs.executions
# Check next page
next_page = wfs.next_page_token
wfs = env.client.list_workflow_page("`#{ATTR_KEY_TEXT.name}` = '#{text_val}'",
next_page_token: next_page)
assert_equal 3, wfs.executions.size
all_wfs += wfs.executions
# Check all IDs are present
assert_equal handles.map(&:id).sort, all_wfs.map(&:id).sort
end

# Normal count
assert_eventually do
count = env.client.count_workflows("`#{ATTR_KEY_TEXT.name}` = '#{text_val}'")
Expand Down
Loading