Skip to content
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
3127dd7
NOT FOR MERGE: Bump api-go version to include latest versioning API c…
ShahabT Mar 16, 2024
7a9c619
NOT FOR MERGE: Undo changes that we want to merge
antlai-temporal Mar 18, 2024
d10ada0
Redo initial changes that we want to merge
antlai-temporal Mar 18, 2024
115af43
NOT FOR MERGE: Update api-go version to latest
antlai-temporal Mar 20, 2024
c48e2a3
Support Update and Get for the new versioning rules (#1429)
antlai-temporal Apr 2, 2024
50fca78
NOT FOR MERGE: Update api-go version to latest
antlai-temporal Apr 2, 2024
69259e2
Change UpdateWorkerVersioningRules response to match GetWorkerVersion…
antlai-temporal Apr 3, 2024
b985605
NOT FOR MERGE: Update api-go version to latest
antlai-temporal Apr 4, 2024
4a45118
Rename insert_compatible_redirect_rule to add_compatible_redirect_rul…
antlai-temporal Apr 5, 2024
d9aa0b8
NOT FOR MERGE: disable grpc dial lint errors
antlai-temporal Apr 5, 2024
34a08c8
Update baseline to go sdk 1.26.1
antlai-temporal Apr 11, 2024
e8147ac
Implement DescribeTaskQueueEnhanced (#1453)
antlai-temporal May 3, 2024
c7108f8
Add new VersioningIntent options (#1470)
ShahabT May 14, 2024
a64c6e3
Merge remote-tracking branch 'upstream/master' into versioning-2
antlai-temporal May 29, 2024
6481094
Clean up comments for PR
antlai-temporal May 29, 2024
7807c94
Format deprecation
antlai-temporal May 30, 2024
1f0d12f
Improve GoDocs
antlai-temporal Jun 3, 2024
eadd267
Rename constants, remove pointers in options
antlai-temporal Jun 6, 2024
770a19e
Merge remote-tracking branch 'upstream/master' into versioning-2
antlai-temporal Jun 6, 2024
aa37dff
NOT FOR MERGE: test CI with ubuntu docker image
antlai-temporal Jun 6, 2024
f76ddc4
Adding sleep for reachability cache in test
antlai-temporal Jun 6, 2024
f3c4daf
Replace Sleep by Eventually in test
antlai-temporal Jun 6, 2024
ffc91bb
Remove extra test checks
antlai-temporal Jun 6, 2024
7957957
Undo 'Eventually' in test due to 1min cache TTL
antlai-temporal Jun 7, 2024
ff4541c
Merge remote-tracking branch 'upstream/master' into versioning-2
antlai-temporal Jun 7, 2024
ff82aaf
Undo test CI with ubuntu docker image
antlai-temporal Jun 7, 2024
2b9a66f
Skip new versioning tests with cli devserver
antlai-temporal Jun 7, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,5 @@ test.log
.DS_Store
.gobincache
go.work
go.work.sum
go.work.sum
*~
228 changes: 213 additions & 15 deletions client/client.go

Large diffs are not rendered by default.

21 changes: 21 additions & 0 deletions internal/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,27 @@ type (
// GetWorkerTaskReachability returns which versions are is still in use by open or closed workflows.
GetWorkerTaskReachability(ctx context.Context, options *GetWorkerTaskReachabilityOptions) (*WorkerTaskReachability, error)

// DescribeTaskQueueEnhanced returns information about the target task queue, broken down by Build Id:
// - List of pollers
// - Workflow Reachability status
// - Backlog info for Workflow and/or Activity tasks
// When not supported by the server, it returns an empty [TaskQueueDescription] if there is no information
// about the task queue, or an error when the response identifies an unsupported server.
// Note that using a sticky queue as target is not supported.
// WARNING: Worker versioning is currently experimental, and requires server 1.24+
DescribeTaskQueueEnhanced(ctx context.Context, options *DescribeTaskQueueEnhancedOptions) (TaskQueueDescription, error)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know the old new version took pointers for options, I think @cretz prefers it without pointers. The SDK is not consistent here, but we should try to be consistent for new APIs. I think without a pointer is probably nicer since we don't have to deal withnil.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Happy to change that, I saw 50/50 of APIs using pointers, and the old versioning always using it.
But if we do change it, we should also change the new UpdateWorkerVersioningRules and GetWorkerVersioningRules to not have pointers, so that all are consistent...
@cretz what do you think?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 Confirmed on Slack, without pointers is best IMO


// UpdateWorkerVersioningRules allows updating the worker-build-id based assignment and redirect rules for a given
// task queue. This is used in conjunction with workers who specify their build id and thus opt into the feature.
// The errors it can return:
// - serviceerror.FailedPrecondition when the conflict token is invalid
// WARNING: Worker versioning is currently experimental, and requires server 1.24+
UpdateWorkerVersioningRules(ctx context.Context, options *UpdateWorkerVersioningRulesOptions) (*WorkerVersioningRules, error)

// GetWorkerVersioningRules returns the worker-build-id assignment and redirect rules for a task queue.
// WARNING: Worker versioning is currently experimental, and requires server 1.24+
GetWorkerVersioningRules(ctx context.Context, options *GetWorkerVersioningOptions) (*WorkerVersioningRules, error)

// CheckHealth performs a server health check using the gRPC health check
// API. If the check fails, an error is returned.
CheckHealth(ctx context.Context, request *CheckHealthRequest) (*CheckHealthResponse, error)
Expand Down
321 changes: 321 additions & 0 deletions internal/internal_versioning_client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,321 @@
// The MIT License
//
// Copyright (c) 2024 Temporal Technologies Inc. All rights reserved.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package internal

import (
"errors"
"time"

"go.temporal.io/api/common/v1"
taskqueuepb "go.temporal.io/api/taskqueue/v1"
"go.temporal.io/api/workflowservice/v1"

enumspb "go.temporal.io/api/enums/v1"
)

// TaskQueueType specifies which category of tasks are associated with a queue.
type TaskQueueType int

const (
// TaskQueueTypeUnspecified indicates the task queue type was not specified.
TaskQueueTypeUnspecified = iota
// TaskQueueTypeWorkflow indicates the task queue is used for dispatching workflow tasks.
TaskQueueTypeWorkflow
// TaskQueueTypeActivity indicates the task queue is used for delivering activity tasks.
TaskQueueTypeActivity
// TaskQueueTypeNexus indicates the task queue is used for dispatching Nexus requests.
TaskQueueTypeNexus
)

// BuildIDTaskReachability specifies which category of tasks may reach a versioned worker of a certain Build ID.
//
// Note: future activities who inherit their workflow's Build ID but not its task queue will not be
// accounted for reachability as server cannot know if they'll happen as they do not use
// assignment rules of their task queue. Same goes for Child Workflows or Continue-As-New Workflows
// who inherit the parent/previous workflow's Build ID but not its task queue. In those cases, make
// sure to query reachability for the parent/previous workflow's task queue as well.
type BuildIDTaskReachability int

const (
// BuildIDTaskReachabilityUnspecified indicates that task reachability was not reported.
BuildIDTaskReachabilityUnspecified = iota
// BuildIDTaskReachabilityReachable indicates that this Build ID may be used by new workflows or activities
// (based on versioning rules), or there are open workflows or backlogged activities assigned to it.
BuildIDTaskReachabilityReachable
// BuildIDTaskReachabilityClosedWorkflowsOnly specifies that this Build ID does not have open workflows
// and is not reachable by new workflows, but MAY have closed workflows within the namespace retention period.
// Not applicable to activity-only task queues.
BuildIDTaskReachabilityClosedWorkflowsOnly
// BuildIDTaskReachabilityUnreachable indicates that this Build ID is not used for new executions, nor
// it has been used by any existing execution within the retention period.
BuildIDTaskReachabilityUnreachable
)

type (
// TaskQueueVersionSelection is a task queue filter based on versioning.
// It is an optional component of [DescribeTaskQueueEnhancedOptions].
TaskQueueVersionSelection struct {
// Include specific Build IDs.
BuildIDs []string
// Include the unversioned queue.
Unversioned bool
// Include all active versions. A version is active if it has had new
// tasks or polls recently.
AllActive bool
}

// DescribeTaskQueueEnhancedOptions is the input to [Client.DescribeTaskQueueEnhanced].
DescribeTaskQueueEnhancedOptions struct {
// Name of the task queue. Sticky queues are not supported.
TaskQueue string
// An optional queue selector based on versioning. If not provided,
// the result for the default Build ID will be returned. The default
// Build ID is the one mentioned in the first unconditional Assignment Rule.
// If there is no default Build ID, the result for the
// unversioned queue will be returned.
Versions *TaskQueueVersionSelection
// Task queue types to report info about. If not specified, all types are considered.
TaskQueueTypes []TaskQueueType
// Include list of pollers for requested task queue types and versions.
ReportPollers bool
// Include task reachability for the requested versions and all task types
// (task reachability is not reported per task type).
ReportTaskReachability bool
}

// WorkerVersionCapabilities includes a worker's build identifier
// and whether it is choosing to use the versioning feature.
// It is an optional component of [PollerInfo].
WorkerVersionCapabilities struct {
// Build ID of the worker.
BuildID string
// Whether the worker is using the versioning feature.
UseVersioning bool
}

// PollerInfo provides information about a worker/client polling a task queue.
// It is used by [TaskQueueTypeInfo].
PollerInfo struct {
// Time of the last poll. A value of zero means it was not set.
LastAccessTime time.Time
// The identity of the worker/client who is polling this task queue.
Identity string
// Polling rate. A value of zero means it was not set.
RatePerSecond float64
// Optional poller versioning capabilities. Available when a worker has opted into the worker versioning feature.
WorkerVersionCapabilities *WorkerVersionCapabilities
}

// TaskQueueTypeInfo specifies task queue information per task type and Build ID.
// It is included in [TaskQueueVersionInfo].
TaskQueueTypeInfo struct {
// Poller details for this task queue category.
Pollers []PollerInfo
}

// TaskQueueVersionInfo includes task queue information per Build ID.
// It is part of [TaskQueueDescription].
TaskQueueVersionInfo struct {
// Task queue info per task type.
TypesInfo map[TaskQueueType]TaskQueueTypeInfo
// The category of tasks that may reach a versioned worker of a certain Build ID.
TaskReachability BuildIDTaskReachability
}

// TaskQueueDescription is the response to [Client.DescribeTaskQueueEnhanced].
TaskQueueDescription struct {
// Task queue information for each Build ID. Empty string as key value means unversioned.
VersionsInfo map[string]TaskQueueVersionInfo
}
)

func (o *DescribeTaskQueueEnhancedOptions) validateAndConvertToProto(namespace string) (*workflowservice.DescribeTaskQueueRequest, error) {
if namespace == "" {
return nil, errors.New("missing namespace argument")
}

if o.TaskQueue == "" {
return nil, errors.New("missing task queue field")
}

taskQueueTypes := make([]enumspb.TaskQueueType, len(o.TaskQueueTypes))
for i, t := range o.TaskQueueTypes {
taskQueueTypes[i] = taskQueueTypeToProto(t)
}

opt := &workflowservice.DescribeTaskQueueRequest{
Namespace: namespace,
TaskQueue: &taskqueuepb.TaskQueue{
// Sticky queues not supported
Name: o.TaskQueue,
},
ApiMode: enumspb.DESCRIBE_TASK_QUEUE_MODE_ENHANCED,
Versions: taskQueueVersionSelectionToProto(o.Versions),
TaskQueueTypes: taskQueueTypes,
ReportPollers: o.ReportPollers,
ReportTaskReachability: o.ReportTaskReachability,
}

return opt, nil
}

func workerVersionCapabilitiesFromResponse(response *common.WorkerVersionCapabilities) *WorkerVersionCapabilities {
if response == nil {
return nil
}

return &WorkerVersionCapabilities{
BuildID: response.GetBuildId(),
UseVersioning: response.GetUseVersioning(),
}
}

func pollerInfoFromResponse(response *taskqueuepb.PollerInfo) PollerInfo {
if response == nil {
return PollerInfo{}
}

lastAccessTime := time.Time{}
if response.GetLastAccessTime() != nil {
lastAccessTime = response.GetLastAccessTime().AsTime()
}

return PollerInfo{
LastAccessTime: lastAccessTime,
Identity: response.GetIdentity(),
RatePerSecond: response.GetRatePerSecond(),
WorkerVersionCapabilities: workerVersionCapabilitiesFromResponse(response.GetWorkerVersionCapabilities()),
}
}

func taskQueueTypeInfoFromResponse(response *taskqueuepb.TaskQueueTypeInfo) TaskQueueTypeInfo {
if response == nil {
return TaskQueueTypeInfo{}
}

pollers := make([]PollerInfo, len(response.GetPollers()))
for i, pInfo := range response.GetPollers() {
pollers[i] = pollerInfoFromResponse(pInfo)
}

return TaskQueueTypeInfo{
Pollers: pollers,
}
}

func taskQueueVersionInfoFromResponse(response *taskqueuepb.TaskQueueVersionInfo) TaskQueueVersionInfo {
if response == nil {
return TaskQueueVersionInfo{}
}

typesInfo := make(map[TaskQueueType]TaskQueueTypeInfo, len(response.GetTypesInfo()))
for taskType, tInfo := range response.GetTypesInfo() {
typesInfo[taskQueueTypeFromProto(enumspb.TaskQueueType(taskType))] = taskQueueTypeInfoFromResponse(tInfo)
}

return TaskQueueVersionInfo{
TypesInfo: typesInfo,
TaskReachability: buildIDTaskReachabilityFromProto(response.GetTaskReachability()),
}
}

func detectTaskQueueEnhancedNotSupported(response *workflowservice.DescribeTaskQueueResponse) error {
// A server before 1.24 returns a non-enhanced proto, which only fills `pollers` and `taskQueueStatus` fields
if len(response.GetVersionsInfo()) == 0 &&
(len(response.GetPollers()) > 0 || response.GetTaskQueueStatus() != nil) {
return errors.New("server does not support `DescribeTaskQueueEnhanced`")
}
return nil
}

func taskQueueDescriptionFromResponse(response *workflowservice.DescribeTaskQueueResponse) TaskQueueDescription {
if response == nil {
return TaskQueueDescription{}
}

versionsInfo := make(map[string]TaskQueueVersionInfo, len(response.GetVersionsInfo()))
for buildID, vInfo := range response.GetVersionsInfo() {
versionsInfo[buildID] = taskQueueVersionInfoFromResponse(vInfo)
}

return TaskQueueDescription{
VersionsInfo: versionsInfo,
}
}

func taskQueueVersionSelectionToProto(s *TaskQueueVersionSelection) *taskqueuepb.TaskQueueVersionSelection {
if s == nil {
return nil
}

return &taskqueuepb.TaskQueueVersionSelection{
BuildIds: s.BuildIDs,
Unversioned: s.Unversioned,
AllActive: s.AllActive,
}
}

func taskQueueTypeToProto(t TaskQueueType) enumspb.TaskQueueType {
switch t {
case TaskQueueTypeUnspecified:
return enumspb.TASK_QUEUE_TYPE_UNSPECIFIED
case TaskQueueTypeWorkflow:
return enumspb.TASK_QUEUE_TYPE_WORKFLOW
case TaskQueueTypeActivity:
return enumspb.TASK_QUEUE_TYPE_ACTIVITY
case TaskQueueTypeNexus:
return enumspb.TASK_QUEUE_TYPE_NEXUS
default:
panic("unknown task queue type")
}
}

func taskQueueTypeFromProto(t enumspb.TaskQueueType) TaskQueueType {
switch t {
case enumspb.TASK_QUEUE_TYPE_UNSPECIFIED:
return TaskQueueTypeUnspecified
case enumspb.TASK_QUEUE_TYPE_WORKFLOW:
return TaskQueueTypeWorkflow
case enumspb.TASK_QUEUE_TYPE_ACTIVITY:
return TaskQueueTypeActivity
case enumspb.TASK_QUEUE_TYPE_NEXUS:
return TaskQueueTypeNexus
default:
panic("unknown task queue type from proto")
}
}

func buildIDTaskReachabilityFromProto(r enumspb.BuildIdTaskReachability) BuildIDTaskReachability {
switch r {
case enumspb.BUILD_ID_TASK_REACHABILITY_UNSPECIFIED:
return BuildIDTaskReachabilityUnspecified
case enumspb.BUILD_ID_TASK_REACHABILITY_REACHABLE:
return BuildIDTaskReachabilityReachable
case enumspb.BUILD_ID_TASK_REACHABILITY_CLOSED_WORKFLOWS_ONLY:
return BuildIDTaskReachabilityClosedWorkflowsOnly
case enumspb.BUILD_ID_TASK_REACHABILITY_UNREACHABLE:
return BuildIDTaskReachabilityUnreachable
default:
panic("unknown task queue reachability")
}
}
Loading