Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions chasm/lib/callback/chasm_invocation.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"io"

"github.com/google/uuid"
"github.com/nexus-rpc/sdk-go/nexus"
commonpb "go.temporal.io/api/common/v1"
"go.temporal.io/server/api/historyservice/v1"
persistencespb "go.temporal.io/server/api/persistence/v1"
Expand Down Expand Up @@ -56,9 +57,14 @@ func (c chasmInvocation) Invoke(
task *callbackspb.InvocationTask,
taskAttr chasm.TaskAttributes,
) invocationResult {
header := nexus.Header(c.nexus.GetHeader())
if header == nil {
header = nexus.Header{}
}

// Get back the base64-encoded ComponentRef from the header.
encodedRef, ok := c.nexus.GetHeader()[commonnexus.CallbackTokenHeader]
if !ok {
encodedRef := header.Get(commonnexus.CallbackTokenHeader)
if encodedRef == "" {
return invocationResultFail{logInternalError(e.logger, "callback missing token", nil)}
}

Expand Down
7 changes: 1 addition & 6 deletions chasm/lib/callback/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,12 @@ import (
"go.temporal.io/api/serviceerror"
"go.temporal.io/server/chasm"
callbackspb "go.temporal.io/server/chasm/lib/callback/gen/callbackpb/v1"
chasmnexus "go.temporal.io/server/chasm/nexus"
"go.temporal.io/server/common/backoff"
"go.temporal.io/server/common/nexus/nexusrpc"
queueserrors "go.temporal.io/server/service/history/queues/errors"
"google.golang.org/protobuf/types/known/timestamppb"
)

const (
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why did this get removed?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mentioned in the description, it was unused and had the wrong value.

Archetype chasm.Archetype = "Callback"
)

type CompletionSource interface {
GetNexusCompletion(ctx chasm.Context, requestID string) (nexusrpc.OperationCompletion, error)
}
Expand Down Expand Up @@ -90,7 +85,7 @@ func (c *Callback) loadInvocationArgs(
)
}

if variant.Url == chasmnexus.CompletionHandlerURL {
if variant.Url == chasm.NexusCompletionHandlerURL {
return chasmInvocation{
nexus: variant,
attempt: c.Attempt,
Expand Down
7 changes: 3 additions & 4 deletions chasm/lib/callback/executors_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
persistencespb "go.temporal.io/server/api/persistence/v1"
"go.temporal.io/server/chasm"
callbackspb "go.temporal.io/server/chasm/lib/callback/gen/callbackpb/v1"
chasmnexus "go.temporal.io/server/chasm/nexus"
"go.temporal.io/server/common/backoff"
"go.temporal.io/server/common/clock"
"go.temporal.io/server/common/dynamicconfig"
Expand Down Expand Up @@ -551,9 +550,9 @@ func TestExecuteInvocationTaskChasm_Outcomes(t *testing.T) {
timeSource.Update(time.Now())

// Create headers
headers := make(map[string]string)
headers := nexus.Header{}
if tc.headerValue != "" {
headers[commonnexus.CallbackTokenHeader] = tc.headerValue
headers.Set(commonnexus.CallbackTokenHeader, tc.headerValue)
}

// Create callback with chasm internal URL
Expand All @@ -564,7 +563,7 @@ func TestExecuteInvocationTaskChasm_Outcomes(t *testing.T) {
Callback: &callbackspb.Callback{
Variant: &callbackspb.Callback_Nexus_{
Nexus: &callbackspb.Callback_Nexus{
Url: chasmnexus.CompletionHandlerURL,
Url: chasm.NexusCompletionHandlerURL,
Header: headers,
},
},
Expand Down
3 changes: 1 addition & 2 deletions chasm/lib/scheduler/invoker_tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (
schedulespb "go.temporal.io/server/api/schedule/v1"
"go.temporal.io/server/chasm"
"go.temporal.io/server/chasm/lib/scheduler/gen/schedulerpb/v1"
chasmnexus "go.temporal.io/server/chasm/nexus"
"go.temporal.io/server/common"
"go.temporal.io/server/common/log"
"go.temporal.io/server/common/log/tag"
Expand Down Expand Up @@ -164,7 +163,7 @@ func (e *InvokerExecuteTaskExecutor) Execute(
lastCompletionState = common.CloneProto(lcs)

// Set up the completion callback to handle workflow results.
cb, err := chasmnexus.GetCallback(ctx, s)
cb, err := chasm.GenerateNexusCallback(ctx, s)
if err != nil {
return struct{}{}, err
}
Expand Down
3 changes: 1 addition & 2 deletions chasm/lib/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
persistencespb "go.temporal.io/server/api/persistence/v1"
"go.temporal.io/server/chasm"
"go.temporal.io/server/chasm/lib/scheduler/gen/schedulerpb/v1"
chasmnexus "go.temporal.io/server/chasm/nexus"
"go.temporal.io/server/common"
"go.temporal.io/server/common/util"
"go.temporal.io/server/service/worker/scheduler"
Expand Down Expand Up @@ -385,7 +384,7 @@ func (s *Scheduler) recordActionResult(result *schedulerActionResult) {
}
}

var _ chasmnexus.CompletionHandler = &Scheduler{}
var _ chasm.NexusCompletionHandler = &Scheduler{}

func executionStatusFromFailure(failure *failurepb.Failure) enumspb.WorkflowExecutionStatus {
switch failure.FailureInfo.(type) {
Expand Down
50 changes: 0 additions & 50 deletions chasm/nexus/nexus_completion.go

This file was deleted.

47 changes: 47 additions & 0 deletions chasm/nexus_completion.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package chasm

import (
"encoding/base64"

commonpb "go.temporal.io/api/common/v1"
persistencespb "go.temporal.io/server/api/persistence/v1"
)

// NexusCompletionHandlerURL is the user-visible URL for Nexus->CHASM callbacks.
const NexusCompletionHandlerURL = "temporal://internal"

// NexusCompletionHandler is implemented by CHASM components that want to handle Nexus operation completion callbacks.
type NexusCompletionHandler interface {
HandleNexusCompletion(ctx MutableContext, completion *persistencespb.ChasmNexusCompletion) error
}

// NexusCompletionHandlerComponent is a CHASM [Component] that also implements [NexusCompletionHandler].
type NexusCompletionHandlerComponent interface {
Component
NexusCompletionHandler
}

// GenerateNexusCallback generates a Callback message indicating a CHASM component to receive Nexus operation completion
// callbacks. Particularly useful for components that want to track a workflow start with StartWorkflowExecution.
func GenerateNexusCallback(ctx Context, component NexusCompletionHandlerComponent) (*commonpb.Callback, error) {
ref, err := ctx.Ref(component)
if err != nil {
return nil, err
}

encodedRef := base64.RawURLEncoding.EncodeToString(ref)
headers := map[string]string{
// NOTE: There's a constant defined for this in common/nexus but to avoid circular dependencies we redefine it here.
// This is acceptable since we are going to eventually have a strongly typed field for passing tokens around.
"temporal-callback-token": encodedRef,
}

return &commonpb.Callback{
Variant: &commonpb.Callback_Nexus_{
Nexus: &commonpb.Callback_Nexus{
Url: NexusCompletionHandlerURL,
Header: headers,
},
},
}, nil
}
10 changes: 8 additions & 2 deletions components/callbacks/chasm_invocation.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"io"

"github.com/google/uuid"
"github.com/nexus-rpc/sdk-go/nexus"
commonpb "go.temporal.io/api/common/v1"
"go.temporal.io/server/api/historyservice/v1"
persistencespb "go.temporal.io/server/api/persistence/v1"
Expand Down Expand Up @@ -44,8 +45,13 @@ func logInternalError(logger log.Logger, internalMsg string, internalErr error)

func (c chasmInvocation) Invoke(ctx context.Context, ns *namespace.Namespace, e taskExecutor, task InvocationTask) invocationResult {
// Get back the base64-encoded ComponentRef from the header.
encodedRef, ok := c.nexus.GetHeader()[commonnexus.CallbackTokenHeader]
if !ok {
header := nexus.Header(c.nexus.GetHeader())
if header == nil {
header = nexus.Header{}
}

encodedRef := header.Get(commonnexus.CallbackTokenHeader)
if encodedRef == "" {
return invocationResultFail{logInternalError(e.Logger, "callback missing token", nil)}
}

Expand Down
3 changes: 1 addition & 2 deletions components/callbacks/executors.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"net/http"

"go.temporal.io/server/chasm"
chasmnexus "go.temporal.io/server/chasm/nexus"
"go.temporal.io/server/common/log"
"go.temporal.io/server/common/metrics"
"go.temporal.io/server/common/namespace"
Expand Down Expand Up @@ -159,7 +158,7 @@ func (e taskExecutor) loadInvocationArgs(

// CHASM internal callbacks make use of Nexus as their callback delivery
// mechanism, but with the internal delivery URL.
if variant.Url == chasmnexus.CompletionHandlerURL {
if variant.Url == chasm.NexusCompletionHandlerURL {
invokable = chasmInvocation{
nexus: variant,
attempt: callback.Attempt,
Expand Down
11 changes: 5 additions & 6 deletions components/callbacks/executors_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (
"go.temporal.io/server/api/historyservice/v1"
"go.temporal.io/server/api/historyservicemock/v1"
persistencespb "go.temporal.io/server/api/persistence/v1"
chasmnexus "go.temporal.io/server/chasm/nexus"
"go.temporal.io/server/chasm"
"go.temporal.io/server/common/backoff"
"go.temporal.io/server/common/definition"
"go.temporal.io/server/common/dynamicconfig"
Expand Down Expand Up @@ -485,9 +485,9 @@ func TestProcessInvocationTaskChasm_Outcomes(t *testing.T) {
)
historyClient := tc.setupHistoryClient(t, ctrl)

headers := make(map[string]string)
headers := nexus.Header{}
if tc.headerValue != "" {
headers[commonnexus.CallbackTokenHeader] = tc.headerValue
headers.Set(commonnexus.CallbackTokenHeader, tc.headerValue)
}

// Create mutable state with the test completion
Expand All @@ -507,7 +507,7 @@ func TestProcessInvocationTaskChasm_Outcomes(t *testing.T) {
Callback: &persistencespb.Callback{
Variant: &persistencespb.Callback_Nexus_{
Nexus: &persistencespb.Callback_Nexus{
Url: chasmnexus.CompletionHandlerURL,
Url: chasm.NexusCompletionHandlerURL,
Header: headers,
},
},
Expand Down Expand Up @@ -554,8 +554,7 @@ func TestProcessInvocationTaskChasm_Outcomes(t *testing.T) {
)

if tc.expectsInternalError {
require.Error(t, err)
require.Contains(t, err.Error(), "internal error, reference-id:")
require.ErrorContains(t, err, "internal error, reference-id:")
} else {
require.NoError(t, err)
}
Expand Down
3 changes: 1 addition & 2 deletions service/history/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
replicationspb "go.temporal.io/server/api/replication/v1"
tokenspb "go.temporal.io/server/api/token/v1"
"go.temporal.io/server/chasm"
chasmnexus "go.temporal.io/server/chasm/nexus"
"go.temporal.io/server/client/history"
"go.temporal.io/server/common"
"go.temporal.io/server/common/archiver"
Expand Down Expand Up @@ -2397,7 +2396,7 @@ func (h *Handler) CompleteNexusOperationChasm(
_, _, err := chasm.UpdateComponent(
ctx,
request.GetCompletion().GetComponentRef(),
func(c chasmnexus.CompletionHandler, ctx chasm.MutableContext, completion *persistencespb.ChasmNexusCompletion) (chasm.NoValue, error) {
func(c chasm.NexusCompletionHandler, ctx chasm.MutableContext, completion *persistencespb.ChasmNexusCompletion) (chasm.NoValue, error) {
return nil, c.HandleNexusCompletion(ctx, completion)
},
completion)
Expand Down
Loading