Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 27 additions & 11 deletions aggregatedpool/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ func (wp *Workflow) handleUpdate(name string, id string, input *commonpb.Payload
},
input,
header,
wp.getWorkflowWorkerPid(),
)
}

Expand All @@ -94,6 +95,7 @@ func (wp *Workflow) handleCancel() {
internal.CancelWorkflow{RunID: wp.env.WorkflowInfo().WorkflowExecution.RunID},
nil,
wp.header,
wp.getWorkflowWorkerPid(),
)
}

Expand All @@ -107,6 +109,7 @@ func (wp *Workflow) handleSignal(name string, input *commonpb.Payloads, header *
},
input,
header,
wp.getWorkflowWorkerPid(),
)

return nil
Expand Down Expand Up @@ -223,7 +226,7 @@ func (wp *Workflow) handleMessage(msg *internal.Message) error {
return errors.E(op, err)
}

wp.mq.PushResponse(msg.ID, result)
wp.mq.PushResponse(msg.ID, result, wp.getWorkflowWorkerPid())
err = wp.flushQueue()
if err != nil {
return errors.E(op, err)
Expand Down Expand Up @@ -279,7 +282,7 @@ func (wp *Workflow) handleMessage(msg *internal.Message) error {
case *internal.CompleteWorkflow:
wp.log.Debug("complete workflow request", zap.Uint64("ID", msg.ID))
result, _ := wp.env.GetDataConverter().ToPayloads(completed)
wp.mq.PushResponse(msg.ID, result)
wp.mq.PushResponse(msg.ID, result, wp.getWorkflowWorkerPid())

if msg.Failure == nil {
wp.env.Complete(msg.Payloads, nil)
Expand All @@ -291,7 +294,7 @@ func (wp *Workflow) handleMessage(msg *internal.Message) error {
case *internal.ContinueAsNew:
wp.log.Debug("continue-as-new request", zap.Uint64("ID", msg.ID), zap.String("name", command.Name))
result, _ := wp.env.GetDataConverter().ToPayloads(completed)
wp.mq.PushResponse(msg.ID, result)
wp.mq.PushResponse(msg.ID, result, wp.getWorkflowWorkerPid())

wp.env.Complete(nil, &workflow.ContinueAsNewError{
WorkflowType: &bindings.WorkflowType{
Expand Down Expand Up @@ -503,7 +506,7 @@ func (wp *Workflow) handleMessage(msg *internal.Message) error {
}

result, _ := wp.env.GetDataConverter().ToPayloads(completed)
wp.mq.PushResponse(msg.ID, result)
wp.mq.PushResponse(msg.ID, result, wp.getWorkflowWorkerPid())

err = wp.flushQueue()
if err != nil {
Expand Down Expand Up @@ -540,12 +543,12 @@ func (wp *Workflow) createLocalActivityCallback(id uint64) bindings.LocalActivit

if lar.Err != nil {
wp.log.Debug("error", zap.Error(lar.Err), zap.Int32("attempt", lar.Attempt), zap.Duration("backoff", lar.Backoff))
wp.mq.PushError(id, temporal.GetDefaultFailureConverter().ErrorToFailure(lar.Err))
wp.mq.PushError(id, temporal.GetDefaultFailureConverter().ErrorToFailure(lar.Err), wp.getWorkflowWorkerPid())
return
}

wp.log.Debug("pushing local activity response", zap.Uint64("ID", id))
wp.mq.PushResponse(id, lar.Result)
wp.mq.PushResponse(id, lar.Result, wp.getWorkflowWorkerPid())
}

return func(lar *bindings.LocalActivityResultWrapper) {
Expand All @@ -571,13 +574,13 @@ func (wp *Workflow) createCallback(id uint64, t string) bindings.ResultHandler {

if err != nil {
wp.log.Debug("error", zap.Error(err), zap.String("type", t))
wp.mq.PushError(id, temporal.GetDefaultFailureConverter().ErrorToFailure(err))
wp.mq.PushError(id, temporal.GetDefaultFailureConverter().ErrorToFailure(err), wp.getWorkflowWorkerPid())
return
}

wp.log.Debug("pushing response", zap.Uint64("ID", id), zap.String("type", t))
// fetch original payload
wp.mq.PushResponse(id, result)
wp.mq.PushResponse(id, result, wp.getWorkflowWorkerPid())
}

return func(result *commonpb.Payloads, err error) {
Expand All @@ -603,11 +606,11 @@ func (wp *Workflow) createContinuableCallback(id uint64, t string) bindings.Resu
wp.canceller.Discard(id)

if err != nil {
wp.mq.PushError(id, temporal.GetDefaultFailureConverter().ErrorToFailure(err))
wp.mq.PushError(id, temporal.GetDefaultFailureConverter().ErrorToFailure(err), wp.getWorkflowWorkerPid())
return
}

wp.mq.PushResponse(id, result)
wp.mq.PushResponse(id, result, wp.getWorkflowWorkerPid())
err = wp.flushQueue()
if err != nil {
panic(err)
Expand Down Expand Up @@ -678,7 +681,9 @@ func (wp *Workflow) flushQueue() error {
func (wp *Workflow) runCommand(cmd any, payloads *commonpb.Payloads, header *commonpb.Header) (*internal.Message, error) {
const op = errors.Op("workflow_process_runcommand")
msg := &internal.Message{}
wp.mq.AllocateMessage(cmd, payloads, header, msg)
// attempt to prevent sending the response from the dead worker

wp.mq.AllocateMessage(cmd, payloads, header, msg, wp.getWorkflowWorkerPid())

if wp.mh != nil {
wp.mh.Gauge(RrMetricName).Update(float64(wp.pool.QueueSize()))
Expand Down Expand Up @@ -734,6 +739,17 @@ func (wp *Workflow) runCommand(cmd any, payloads *commonpb.Payloads, header *com
return msgs[0], nil
}

func (wp *Workflow) getWorkflowWorkerPid() int {
wp.log.Debug("fetching workflow worker pid")
wfw := wp.pool.Workers()
if len(wfw) > 0 {
wp.log.Debug("workflow worker pid found", zap.Int("pid", int(wfw[0].Pid())))
return int(wfw[0].Pid())
}
wp.log.Debug("workflow worker pid not found")
return 0
}

func (wp *Workflow) getPld() *payload.Payload {
return wp.pldPool.Get().(*payload.Payload)
}
Expand Down
2 changes: 1 addition & 1 deletion aggregatedpool/workers.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func TemporalWorkers(wDef *Workflow, actDef *Activity, wi []*internal.WorkerInfo
)
}

// interceptor used here to the headers
// interceptor used here to the headers
wi[i].Options.Interceptors = append(wi[i].Options.Interceptors, NewWorkerInterceptor())
for _, interceptor := range interceptors {
wi[i].Options.Interceptors = append(wi[i].Options.Interceptors, interceptor.WorkerInterceptor())
Expand Down
3 changes: 2 additions & 1 deletion aggregatedpool/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,7 @@ func (wp *Workflow) Execute(env bindings.WorkflowEnvironment, header *commonpb.H
stwfcmd,
input,
wp.header,
wp.getWorkflowWorkerPid(),
)
}

Expand Down Expand Up @@ -361,7 +362,7 @@ func (wp *Workflow) StackTrace() string {

func (wp *Workflow) Close() {
wp.log.Debug("close workflow", zap.String("RunID", wp.env.WorkflowInfo().WorkflowExecution.RunID))
// when closing workflow, we should drain(execute) unhandled updates
// when closing the workflow, we should drain(execute) unhandled updates
if wp.env.DrainUnhandledUpdates() {
wp.log.Info("drained unhandled updates")
}
Expand Down
8 changes: 4 additions & 4 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ go 1.25.0
require (
github.com/goccy/go-json v0.10.5
github.com/google/uuid v1.6.0
github.com/roadrunner-server/api/v4 v4.22.1
github.com/roadrunner-server/api/v4 v4.23.0
github.com/roadrunner-server/endure/v2 v2.6.2
github.com/roadrunner-server/errors v1.4.1
github.com/roadrunner-server/events v1.0.1
Expand Down Expand Up @@ -58,13 +58,13 @@ require (
github.com/yusufpapurcu/wmi v1.2.4 // indirect
go.uber.org/atomic v1.11.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/net v0.45.0 // indirect
golang.org/x/net v0.46.0 // indirect
golang.org/x/sync v0.17.0 // indirect
golang.org/x/sys v0.37.0 // indirect
golang.org/x/text v0.30.0 // indirect
golang.org/x/time v0.14.0 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20251007200510-49b9836ed3ff // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20251007200510-49b9836ed3ff // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20251103181224-f26f9409b101 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20251103181224-f26f9409b101 // indirect
google.golang.org/grpc v1.76.0
gopkg.in/yaml.v3 v3.0.1 // indirect
)
16 changes: 8 additions & 8 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -171,8 +171,8 @@ github.com/prometheus/procfs v0.1.3/go.mod h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4O
github.com/prometheus/procfs v0.6.0/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA=
github.com/prometheus/procfs v0.17.0 h1:FuLQ+05u4ZI+SS/w9+BWEM2TXiHKsUQ9TADiRH7DuK0=
github.com/prometheus/procfs v0.17.0/go.mod h1:oPQLaDAMRbA+u8H5Pbfq+dl3VDAvHxMUOVhe0wYB2zw=
github.com/roadrunner-server/api/v4 v4.22.1 h1:m+VkbRhRm8VwW+6cF2SsLlKou4IihEgwIOrZBjQe6KE=
github.com/roadrunner-server/api/v4 v4.22.1/go.mod h1:nZM+QWvCYaP3o5ZYlwp6ie+9jbAYlxsnd7tbMxU777c=
github.com/roadrunner-server/api/v4 v4.23.0 h1:lrVXgP4ozD/H5DrIdT181ldVhD1R9QT5qsi8qWUTDF4=
github.com/roadrunner-server/api/v4 v4.23.0/go.mod h1:AlHuVVOklb7XF33Cf7IfmwOn3j4gGg37on9Xi6j08Bg=
github.com/roadrunner-server/endure/v2 v2.6.2 h1:sIB4kTyE7gtT3fDhuYWUYn6Vt/dcPtiA6FoNS1eS+84=
github.com/roadrunner-server/endure/v2 v2.6.2/go.mod h1:t/2+xpNYgGBwhzn83y2MDhvhZ19UVq1REcvqn7j7RB8=
github.com/roadrunner-server/errors v1.4.1 h1:LKNeaCGiwd3t8IaL840ZNF3UA9yDQlpvHnKddnh0YRQ=
Expand Down Expand Up @@ -293,8 +293,8 @@ golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwY
golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM=
golang.org/x/net v0.0.0-20210913180222-943fd674d43e/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/net v0.0.0-20211015210444-4f30a5c0130f/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/net v0.45.0 h1:RLBg5JKixCy82FtLJpeNlVM0nrSqpCRYzVU1n8kj0tM=
golang.org/x/net v0.45.0/go.mod h1:ECOoLqd5U3Lhyeyo/QDCEVQ4sNgYsqvCZ722XogGieY=
golang.org/x/net v0.46.0 h1:giFlY12I07fugqwPuWJi68oOnpfqFnJIJzaIIm2JVV4=
golang.org/x/net v0.46.0/go.mod h1:Q9BGdFy1y4nkUwiLvT5qtyhAnEHgnQ/zd8PfU6nc210=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
Expand Down Expand Up @@ -371,10 +371,10 @@ google.golang.org/genproto v0.0.0-20200423170343-7949de9c1215/go.mod h1:55QSHmfG
google.golang.org/genproto v0.0.0-20200513103714-09dca8ec2884/go.mod h1:55QSHmfGQM9UVYDPBsyGGes0y52j32PQ3BqQfXhyH3c=
google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013/go.mod h1:NbSheEEYHJ7i3ixzK3sjbqSGDJWnxyFXZblF3eUsNvo=
google.golang.org/genproto v0.0.0-20210909211513-a8c4777a87af/go.mod h1:eFjDcFEctNawg4eG61bRv87N7iHBWyVhJu7u1kqDUXY=
google.golang.org/genproto/googleapis/api v0.0.0-20251007200510-49b9836ed3ff h1:8Zg5TdmcbU8A7CXGjGXF1Slqu/nIFCRaR3S5gT2plIA=
google.golang.org/genproto/googleapis/api v0.0.0-20251007200510-49b9836ed3ff/go.mod h1:dbWfpVPvW/RqafStmRWBUpMN14puDezDMHxNYiRfQu0=
google.golang.org/genproto/googleapis/rpc v0.0.0-20251007200510-49b9836ed3ff h1:A90eA31Wq6HOMIQlLfzFwzqGKBTuaVztYu/g8sn+8Zc=
google.golang.org/genproto/googleapis/rpc v0.0.0-20251007200510-49b9836ed3ff/go.mod h1:7i2o+ce6H/6BluujYR+kqX3GKH+dChPTQU19wjRPiGk=
google.golang.org/genproto/googleapis/api v0.0.0-20251103181224-f26f9409b101 h1:vk5TfqZHNn0obhPIYeS+cxIFKFQgser/M2jnI+9c6MM=
google.golang.org/genproto/googleapis/api v0.0.0-20251103181224-f26f9409b101/go.mod h1:E17fc4PDhkr22dE3RgnH2hEubUaky6ZwW4VhANxyspg=
google.golang.org/genproto/googleapis/rpc v0.0.0-20251103181224-f26f9409b101 h1:tRPGkdGHuewF4UisLzzHHr1spKw92qLM98nIzxbC0wY=
google.golang.org/genproto/googleapis/rpc v0.0.0-20251103181224-f26f9409b101/go.mod h1:7i2o+ce6H/6BluujYR+kqX3GKH+dChPTQU19wjRPiGk=
google.golang.org/grpc v1.12.0/go.mod h1:yo6s7OP7yaDglbqo1J04qKzAhqBH6lvTonzMVmEdcZw=
google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c=
google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg=
Expand Down
Loading
Loading