Skip to content

Commit be9c5cc

Browse files
committed
fix(update): fixes <Update>Async for those that exceed default grpc poll timeout
1 parent 8035010 commit be9c5cc

File tree

11 files changed

+305
-60
lines changed

11 files changed

+305
-60
lines changed

README.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ A protoc plugin for generating typed Temporal clients and workers in Go from pro
1515
- [Service Options](#service-options)
1616
- [Method Options](#method-options)
1717
- [Bloblang Expressions](#bloblang-expressions)
18+
- [Workflow Update](#workflow-update)
1819
- [CLI](#cli)
1920
- [Test Client](#test-client)
2021
- [Cross-Namespace (XNS)](#cross-namespace-xns)
@@ -608,6 +609,13 @@ run, _ := example.ExecuteSayGreeting(context.Background(), &examplev1.SayGreetin
608609
require.Regexp(`^say-greeting/Howdy/Stranger/[a-f0-9-]{32}$`, run.ID())
609610
```
610611
612+
## Workflow Update
613+
614+
This plugin has experimental support for Temporal's experimental [Workflow Update](https://docs.temporal.io/workflows#update) capability. Note that this requires cluster support enabled with both of the following dynamic config values set to `true`
615+
616+
- `frontend.enableUpdateWorkflowExecution`
617+
- `frontend.enableUpdateWorkflowExecutionAsyncAccepted`
618+
611619
## CLI
612620
613621
This plugin can optionally generate a configurable CLI using [github.com/urfave/cli/v2](https://github.com/urfave/cli/v2). To enable this functionality, use the corresponding [plugin option](#plugin-options). When enabled, this plugin will generate a CLI command for each workflow, start-workflow-with-signal, query, and signal. Each command provides typed flags for configuring the corresponding inputs and options.

example/README.md

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,9 @@
33
In an initial terminal:
44
1. Start temporal
55
```shell
6-
temporal server start-dev --dynamic-config-value "frontend.enableUpdateWorkflowExecution=true"
6+
temporal server start-dev \
7+
--dynamic-config-value "frontend.enableUpdateWorkflowExecution=true" \
8+
--dynamic-config-value "frontend.enableUpdateWorkflowExecutionAsyncAccepted=true"
79
```
810

911
In a second terminal:

example/main.go

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"os"
1010
"os/signal"
1111
"syscall"
12+
"time"
1213

1314
"github.com/cludden/protoc-gen-go-temporal/example/external"
1415
examplev1 "github.com/cludden/protoc-gen-go-temporal/gen/example/v1"
@@ -52,7 +53,7 @@ func (wf *CreateFooWorkflow) Execute(ctx workflow.Context) (*examplev1.CreateFoo
5253
workflow.Go(ctx, func(ctx workflow.Context) {
5354
for {
5455
signal, _ := wf.SetFooProgress.Receive(ctx)
55-
wf.UpdateFooProgress(ctx, &examplev1.SetFooProgressRequest{Progress: signal.GetProgress()})
56+
wf.setProgress(signal)
5657
}
5758
})
5859

@@ -84,6 +85,13 @@ func (wf *CreateFooWorkflow) GetFooProgress() (*examplev1.GetFooProgressResponse
8485

8586
// UpdateFooProgress defines the handler for an UpdateFooProgress update
8687
func (wf *CreateFooWorkflow) UpdateFooProgress(ctx workflow.Context, req *examplev1.SetFooProgressRequest) (*examplev1.GetFooProgressResponse, error) {
88+
if err := workflow.Sleep(ctx, time.Second*45); err != nil {
89+
return nil, err
90+
}
91+
return wf.setProgress(req)
92+
}
93+
94+
func (wf *CreateFooWorkflow) setProgress(req *examplev1.SetFooProgressRequest) (*examplev1.GetFooProgressResponse, error) {
8795
wf.progress = req.GetProgress()
8896
switch {
8997
case wf.progress < 0:

example/main_test.go

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,11 @@ import (
1212
"github.com/cludden/protoc-gen-go-temporal/mocks/go.temporal.io/sdk/clientutils"
1313
"github.com/stretchr/testify/mock"
1414
"github.com/stretchr/testify/require"
15+
"go.temporal.io/api/enums/v1"
16+
"go.temporal.io/api/operatorservice/v1"
1517
"go.temporal.io/sdk/client"
18+
"go.temporal.io/sdk/testsuite"
19+
"go.temporal.io/sdk/worker"
1620
)
1721

1822
func TestCreateFooStartWorkflowOptions(t *testing.T) {
@@ -54,3 +58,67 @@ func TestCreateFooStartWorkflowOptions(t *testing.T) {
5458
_, err := example.CreateFooAsync(ctx, &examplev1.CreateFooInput{RequestName: "bar"})
5559
require.NoError(err)
5660
}
61+
62+
func TestUpdate(t *testing.T) {
63+
if testing.Short() {
64+
t.SkipNow()
65+
}
66+
67+
require, ctx := require.New(t), context.Background()
68+
69+
srv, err := testsuite.StartDevServer(ctx, testsuite.DevServerOptions{
70+
LogLevel: "fatal",
71+
ExtraArgs: []string{
72+
"--dynamic-config-value", "frontend.enableUpdateWorkflowExecution=true",
73+
"--dynamic-config-value", "frontend.enableUpdateWorkflowExecutionAsyncAccepted=true",
74+
},
75+
})
76+
require.NoError(err)
77+
defer srv.Stop()
78+
79+
c := srv.Client()
80+
example := examplev1.NewExampleClient(c)
81+
82+
_, err = c.OperatorService().AddSearchAttributes(ctx, &operatorservice.AddSearchAttributesRequest{
83+
Namespace: "default",
84+
SearchAttributes: map[string]enums.IndexedValueType{
85+
"foo": enums.INDEXED_VALUE_TYPE_TEXT,
86+
"created_at": enums.INDEXED_VALUE_TYPE_DATETIME,
87+
},
88+
})
89+
require.NoError(err)
90+
91+
w := worker.New(c, examplev1.ExampleTaskQueue, worker.Options{})
92+
examplev1.RegisterExampleActivities(w, &Activities{})
93+
examplev1.RegisterExampleWorkflows(w, &Workflows{})
94+
require.NoError(w.Start())
95+
defer w.Stop()
96+
defer c.Close()
97+
98+
run, err := example.CreateFooAsync(ctx, &examplev1.CreateFooInput{RequestName: "test"})
99+
require.NoError(err)
100+
101+
require.NoError(run.SetFooProgress(ctx, &examplev1.SetFooProgressRequest{Progress: 5.7}))
102+
103+
query, err := run.GetFooProgress(ctx)
104+
require.NoError(err)
105+
require.Equal(float32(5.7), query.GetProgress())
106+
require.Equal(examplev1.Foo_FOO_STATUS_CREATING.String(), query.GetStatus().String())
107+
108+
handle, err := run.UpdateFooProgressAsync(ctx, &examplev1.SetFooProgressRequest{Progress: 100})
109+
require.NoError(err)
110+
111+
update, err := handle.Get(ctx)
112+
require.NoError(err)
113+
require.Equal(float32(100), update.GetProgress())
114+
require.Equal(examplev1.Foo_FOO_STATUS_READY.String(), update.GetStatus().String())
115+
116+
// update, err := run.UpdateFooProgress(ctx, &examplev1.SetFooProgressRequest{Progress: 100})
117+
// require.NoError(err)
118+
// require.Equal(float32(100), update.GetProgress())
119+
// require.Equal(examplev1.Foo_FOO_STATUS_READY.String(), update.GetStatus().String())
120+
121+
resp, err := run.Get(ctx)
122+
require.NoError(err)
123+
require.Equal(examplev1.Foo_FOO_STATUS_READY.String(), resp.GetFoo().GetStatus().String())
124+
}

gen/example/v1/example_temporal.pb.go

Lines changed: 41 additions & 6 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

gen/example/v1/examplev1xns/example_xns_temporal.pb.go

Lines changed: 2 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)