Skip to content

Commit f024241

Browse files
MichaelSnowdenrodrigozhou
authored andcommitted
Use search attribute type map in visibility archival (#4304)
* Use search attribute type map in visibility archival * pr comments
1 parent 4e0c7ed commit f024241

File tree

2 files changed

+112
-175
lines changed

2 files changed

+112
-175
lines changed

service/history/archival/archiver.go

Lines changed: 43 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ import (
3636
commonpb "go.temporal.io/api/common/v1"
3737
enumspb "go.temporal.io/api/enums/v1"
3838
"go.temporal.io/api/serviceerror"
39+
"go.temporal.io/server/common/persistence/visibility/manager"
3940
"go.uber.org/multierr"
4041

4142
archiverspb "go.temporal.io/server/api/archiver/v1"
@@ -92,10 +93,12 @@ type (
9293
}
9394

9495
archiver struct {
95-
archiverProvider provider.ArchiverProvider
96-
metricsHandler metrics.Handler
97-
logger log.Logger
98-
rateLimiter quotas.RateLimiter
96+
archiverProvider provider.ArchiverProvider
97+
metricsHandler metrics.Handler
98+
logger log.Logger
99+
rateLimiter quotas.RateLimiter
100+
searchAttributeProvider searchattribute.Provider
101+
visibilityManager manager.VisibilityManager
99102
}
100103
)
101104

@@ -110,12 +113,16 @@ func NewArchiver(
110113
logger log.Logger,
111114
metricsHandler metrics.Handler,
112115
rateLimiter quotas.RateLimiter,
116+
searchAttributeProvider searchattribute.Provider,
117+
visibilityManger manager.VisibilityManager,
113118
) Archiver {
114119
return &archiver{
115-
archiverProvider: archiverProvider,
116-
metricsHandler: metricsHandler.WithTags(metrics.OperationTag(metrics.ArchiverClientScope)),
117-
logger: logger,
118-
rateLimiter: rateLimiter,
120+
archiverProvider: archiverProvider,
121+
metricsHandler: metricsHandler.WithTags(metrics.OperationTag(metrics.ArchiverClientScope)),
122+
logger: logger,
123+
rateLimiter: rateLimiter,
124+
searchAttributeProvider: searchAttributeProvider,
125+
visibilityManager: visibilityManger,
119126
}
120127
}
121128

@@ -129,48 +136,64 @@ func (a *archiver) Archive(ctx context.Context, request *Request) (res *Response
129136
tag.ArchivalRequestWorkflowID(request.WorkflowID),
130137
tag.ArchivalRequestRunID(request.RunID),
131138
)
139+
132140
defer func(start time.Time) {
133141
metricsScope := a.metricsHandler
142+
134143
status := "ok"
135144
if err != nil {
136145
status = "err"
146+
137147
var rateLimitExceededErr *serviceerror.ResourceExhausted
148+
138149
if errors.As(err, &rateLimitExceededErr) {
139150
status = "rate_limit_exceeded"
140151
}
152+
141153
logger.Warn("failed to archive workflow", tag.Error(err))
142154
}
155+
143156
metricsScope.Timer(metrics.ArchiverArchiveLatency.GetMetricName()).
144157
Record(time.Since(start), metrics.StringTag("status", status))
145158
}(time.Now())
159+
146160
numTargets := len(request.Targets)
147161
if err := a.rateLimiter.WaitN(ctx, numTargets); err != nil {
148162
return nil, &serviceerror.ResourceExhausted{
149163
Cause: enumspb.RESOURCE_EXHAUSTED_CAUSE_RPS_LIMIT,
150164
Message: fmt.Sprintf("archival rate limited: %s", err.Error()),
151165
}
152166
}
167+
153168
var wg sync.WaitGroup
169+
154170
errs := make([]error, numTargets)
171+
155172
for i, target := range request.Targets {
156173
wg.Add(1)
174+
157175
i := i
176+
158177
switch target {
159178
case TargetHistory:
160179
go func() {
161180
defer wg.Done()
181+
162182
errs[i] = a.archiveHistory(ctx, request, logger)
163183
}()
164184
case TargetVisibility:
165185
go func() {
166186
defer wg.Done()
187+
167188
errs[i] = a.archiveVisibility(ctx, request, logger)
168189
}()
169190
default:
170191
return nil, fmt.Errorf("unknown archival target: %s", target)
171192
}
172193
}
194+
173195
wg.Wait()
196+
174197
return &Response{}, multierr.Combine(errs...)
175198
}
176199

@@ -212,11 +235,18 @@ func (a *archiver) archiveVisibility(ctx context.Context, request *Request, logg
212235
return err
213236
}
214237

215-
// It is safe to pass nil to typeMap here because search attributes type must be embedded by caller.
216-
searchAttributes, err := searchattribute.Stringify(request.SearchAttributes, nil)
238+
// The types of the search attributes may not be embedded in the request,
239+
// so we fetch them from the search attributes provider here.
240+
saTypeMap, err := a.searchAttributeProvider.GetSearchAttributes(a.visibilityManager.GetIndexName(), false)
241+
if err != nil {
242+
return err
243+
}
244+
245+
searchAttributes, err := searchattribute.Stringify(request.SearchAttributes, &saTypeMap)
217246
if err != nil {
218247
return err
219248
}
249+
220250
return visibilityArchiver.Archive(ctx, request.VisibilityURI, &archiverspb.VisibilityRecord{
221251
NamespaceId: request.NamespaceID,
222252
Namespace: request.Namespace,
@@ -238,11 +268,14 @@ func (a *archiver) archiveVisibility(ctx context.Context, request *Request, logg
238268
// statement (this would make the err always nil).
239269
func (a *archiver) recordArchiveTargetResult(logger log.Logger, startTime time.Time, target Target, err *error) {
240270
duration := time.Since(startTime)
271+
241272
status := "ok"
242273
if *err != nil {
243274
status = "err"
275+
244276
logger.Error("failed to archive target", tag.NewStringTag("target", string(target)), tag.Error(*err))
245277
}
278+
246279
tags := []metrics.Tag{
247280
metrics.StringTag("target", string(target)),
248281
metrics.StringTag("status", status),

0 commit comments

Comments
 (0)