Skip to content

Commit 7c0a0e0

Browse files
rodrigozhouyycptt
authored andcommitted
Change update namespace to upsert custom search attributes (#4080)
1 parent 6b747d3 commit 7c0a0e0

File tree

6 files changed

+110
-42
lines changed

6 files changed

+110
-42
lines changed

service/frontend/adminHandler.go

Lines changed: 40 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -342,17 +342,21 @@ func (adh *AdminHandler) addSearchAttributesSQL(
342342
if nsName == "" {
343343
return errNamespaceNotSet
344344
}
345-
ns, err := adh.namespaceRegistry.GetNamespace(namespace.Name(nsName))
345+
resp, err := client.DescribeNamespace(
346+
ctx,
347+
&workflowservice.DescribeNamespaceRequest{Namespace: nsName},
348+
)
346349
if err != nil {
347350
return serviceerror.NewUnavailable(fmt.Sprintf(errUnableToGetNamespaceInfoMessage, nsName))
348351
}
349352

350353
customSearchAttributes := currentSearchAttributes.Custom()
351-
mapper := ns.CustomSearchAttributesMapper()
352-
fieldToAliasMap := util.CloneMapNonNil(mapper.FieldToAliasMap())
354+
upsertFieldToAliasMap := make(map[string]string)
355+
fieldToAliasMap := resp.Config.CustomSearchAttributeAliases
356+
aliasToFieldMap := util.InverseMap(fieldToAliasMap)
353357
for saName, saType := range request.GetSearchAttributes() {
354358
// check if alias is already in use
355-
if _, err := mapper.GetFieldName(saName, nsName); err == nil {
359+
if _, ok := aliasToFieldMap[saName]; ok {
356360
return serviceerror.NewAlreadyExist(
357361
fmt.Sprintf(errSearchAttributeAlreadyExistsMessage, saName),
358362
)
@@ -375,15 +379,18 @@ func (adh *AdminHandler) addSearchAttributesSQL(
375379
fmt.Sprintf(errTooManySearchAttributesMessage, cntUsed, saType.String()),
376380
)
377381
}
378-
fieldToAliasMap[targetFieldName] = saName
382+
upsertFieldToAliasMap[targetFieldName] = saName
379383
}
380384

381385
_, err = client.UpdateNamespace(ctx, &workflowservice.UpdateNamespaceRequest{
382386
Namespace: nsName,
383387
Config: &namespacepb.NamespaceConfig{
384-
CustomSearchAttributeAliases: fieldToAliasMap,
388+
CustomSearchAttributeAliases: upsertFieldToAliasMap,
385389
},
386390
})
391+
if err.Error() == errCustomSearchAttributeFieldAlreadyAllocated.Error() {
392+
return errRaceConditionAddingSearchAttributes
393+
}
387394
return err
388395
}
389396

@@ -470,25 +477,28 @@ func (adh *AdminHandler) removeSearchAttributesSQL(
470477
if nsName == "" {
471478
return errNamespaceNotSet
472479
}
473-
ns, err := adh.namespaceRegistry.GetNamespace(namespace.Name(nsName))
480+
resp, err := client.DescribeNamespace(
481+
ctx,
482+
&workflowservice.DescribeNamespaceRequest{Namespace: nsName},
483+
)
474484
if err != nil {
475485
return serviceerror.NewUnavailable(fmt.Sprintf(errUnableToGetNamespaceInfoMessage, nsName))
476486
}
477487

478-
mapper := ns.CustomSearchAttributesMapper()
479-
fieldToAliasMap := maps.Clone(mapper.FieldToAliasMap())
488+
upsertFieldToAliasMap := make(map[string]string)
489+
aliasToFieldMap := util.InverseMap(resp.Config.CustomSearchAttributeAliases)
480490
for _, saName := range request.GetSearchAttributes() {
481-
fieldName, err := mapper.GetFieldName(saName, nsName)
482-
if err != nil {
491+
fieldName, ok := aliasToFieldMap[saName]
492+
if !ok {
483493
return serviceerror.NewNotFound(fmt.Sprintf(errSearchAttributeDoesntExistMessage, saName))
484494
}
485-
delete(fieldToAliasMap, fieldName)
495+
upsertFieldToAliasMap[fieldName] = ""
486496
}
487497

488498
_, err = client.UpdateNamespace(ctx, &workflowservice.UpdateNamespaceRequest{
489499
Namespace: nsName,
490500
Config: &namespacepb.NamespaceConfig{
491-
CustomSearchAttributeAliases: fieldToAliasMap,
501+
CustomSearchAttributeAliases: upsertFieldToAliasMap,
492502
},
493503
})
494504
return err
@@ -523,7 +533,7 @@ func (adh *AdminHandler) GetSearchAttributes(
523533
if adh.visibilityMgr.HasStoreName(elasticsearch.PersistenceName) || indexName == "" {
524534
return adh.getSearchAttributesElasticsearch(ctx, indexName, searchAttributes)
525535
}
526-
return adh.getSearchAttributesSQL(request, searchAttributes)
536+
return adh.getSearchAttributesSQL(ctx, request, searchAttributes)
527537
}
528538

529539
func (adh *AdminHandler) getSearchAttributesElasticsearch(
@@ -567,23 +577,36 @@ func (adh *AdminHandler) getSearchAttributesElasticsearch(
567577
}
568578

569579
func (adh *AdminHandler) getSearchAttributesSQL(
580+
ctx context.Context,
570581
request *adminservice.GetSearchAttributesRequest,
571582
searchAttributes searchattribute.NameTypeMap,
572583
) (*adminservice.GetSearchAttributesResponse, error) {
584+
_, client, err := adh.clientFactory.NewLocalFrontendClientWithTimeout(
585+
frontend.DefaultTimeout,
586+
frontend.DefaultLongPollTimeout,
587+
)
588+
if err != nil {
589+
return nil, serviceerror.NewUnavailable(fmt.Sprintf(errUnableToCreateFrontendClientMessage, err))
590+
}
591+
573592
nsName := request.GetNamespace()
574593
if nsName == "" {
575594
return nil, errNamespaceNotSet
576595
}
577-
ns, err := adh.namespaceRegistry.GetNamespace(namespace.Name(nsName))
596+
resp, err := client.DescribeNamespace(
597+
ctx,
598+
&workflowservice.DescribeNamespaceRequest{Namespace: nsName},
599+
)
578600
if err != nil {
579601
return nil, serviceerror.NewUnavailable(
580602
fmt.Sprintf(errUnableToGetNamespaceInfoMessage, nsName),
581603
)
582604
}
583-
mapper := ns.CustomSearchAttributesMapper()
605+
606+
fieldToAliasMap := resp.Config.CustomSearchAttributeAliases
584607
customSearchAttributes := make(map[string]enumspb.IndexedValueType)
585608
for field, tp := range searchAttributes.Custom() {
586-
if alias, err := mapper.GetAlias(field, nsName); err == nil {
609+
if alias, ok := fieldToAliasMap[field]; ok {
587610
customSearchAttributes[alias] = tp
588611
}
589612
}

service/frontend/errors.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@ var (
7878
errNamespaceNotSet = serviceerror.NewInvalidArgument("Namespace is not set on request.")
7979
errReasonNotSet = serviceerror.NewInvalidArgument("Reason is not set on request.")
8080
errBatchOperationNotSet = serviceerror.NewInvalidArgument("Batch operation is not set on request.")
81+
errRaceConditionAddingSearchAttributes = serviceerror.NewUnavailable("Generated search attributes mapping unavailble.")
8182

8283
errUpdateMetaNotSet = serviceerror.NewInvalidArgument("Update meta is not set on request.")
8384
errUpdateInputNotSet = serviceerror.NewInvalidArgument("Update input is not set on request.")

service/frontend/fx.go

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -507,7 +507,6 @@ func OperatorHandlerProvider(
507507
saManager searchattribute.Manager,
508508
healthServer *health.Server,
509509
historyClient historyservice.HistoryServiceClient,
510-
namespaceRegistry namespace.Registry,
511510
clusterMetadataManager persistence.ClusterMetadataManager,
512511
clusterMetadata cluster.Metadata,
513512
clientFactory client.Factory,
@@ -523,7 +522,6 @@ func OperatorHandlerProvider(
523522
saManager,
524523
healthServer,
525524
historyClient,
526-
namespaceRegistry,
527525
clusterMetadataManager,
528526
clusterMetadata,
529527
clientFactory,

service/frontend/namespace_handler.go

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ import (
5252
"go.temporal.io/server/common/persistence"
5353
"go.temporal.io/server/common/primitives"
5454
"go.temporal.io/server/common/primitives/timestamp"
55+
"go.temporal.io/server/common/util"
5556
)
5657

5758
type (
@@ -107,6 +108,8 @@ var (
107108
errCannotDoNamespaceFailoverAndUpdate = serviceerror.NewInvalidArgument("Cannot set active cluster to current cluster when other parameters are set.")
108109
errInvalidRetentionPeriod = serviceerror.NewInvalidArgument("A valid retention period is not set on request.")
109110
errInvalidNamespaceStateUpdate = serviceerror.NewInvalidArgument("Invalid namespace state update.")
111+
112+
errCustomSearchAttributeFieldAlreadyAllocated = serviceerror.NewInvalidArgument("Custom search attribute field name already allocated.")
110113
)
111114

112115
// newNamespaceHandler create a new namespace handler
@@ -512,9 +515,16 @@ func (d *namespaceHandlerImpl) UpdateNamespace(
512515
return nil, serviceerror.NewInvalidArgument(fmt.Sprintf("Total resetBinaries cannot exceed the max limit: %v", maxLength))
513516
}
514517
}
515-
if updatedConfig.CustomSearchAttributeAliases != nil {
518+
if len(updatedConfig.CustomSearchAttributeAliases) > 0 {
516519
configurationChanged = true
517-
config.CustomSearchAttributeAliases = updatedConfig.CustomSearchAttributeAliases
520+
csaAliases, err := d.upsertCustomSearchAttributesAliases(
521+
config.CustomSearchAttributeAliases,
522+
updatedConfig.CustomSearchAttributeAliases,
523+
)
524+
if err != nil {
525+
return nil, err
526+
}
527+
config.CustomSearchAttributeAliases = csaAliases
518528
}
519529
}
520530

@@ -778,6 +788,23 @@ func (d *namespaceHandlerImpl) mergeNamespaceData(
778788
return old
779789
}
780790

791+
func (d *namespaceHandlerImpl) upsertCustomSearchAttributesAliases(
792+
current map[string]string,
793+
upsert map[string]string,
794+
) (map[string]string, error) {
795+
result := util.CloneMapNonNil(current)
796+
for key, value := range upsert {
797+
if value == "" {
798+
delete(result, key)
799+
} else if _, ok := current[key]; !ok {
800+
result[key] = value
801+
} else {
802+
return nil, errCustomSearchAttributeFieldAlreadyAllocated
803+
}
804+
}
805+
return result, nil
806+
}
807+
781808
func (d *namespaceHandlerImpl) toArchivalRegisterEvent(
782809
state enumspb.ArchivalState,
783810
URI string,

service/frontend/operator_handler.go

Lines changed: 40 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,6 @@ type (
8484
saManager searchattribute.Manager
8585
healthServer *health.Server
8686
historyClient historyservice.HistoryServiceClient
87-
namespaceRegistry namespace.Registry
8887
clusterMetadataManager persistence.ClusterMetadataManager
8988
clusterMetadata clustermetadata.Metadata
9089
clientFactory svc.Factory
@@ -101,7 +100,6 @@ type (
101100
SaManager searchattribute.Manager
102101
healthServer *health.Server
103102
historyClient historyservice.HistoryServiceClient
104-
namespaceRegistry namespace.Registry
105103
clusterMetadataManager persistence.ClusterMetadataManager
106104
clusterMetadata clustermetadata.Metadata
107105
clientFactory svc.Factory
@@ -125,7 +123,6 @@ func NewOperatorHandlerImpl(
125123
saManager: args.SaManager,
126124
healthServer: args.healthServer,
127125
historyClient: args.historyClient,
128-
namespaceRegistry: args.namespaceRegistry,
129126
clusterMetadataManager: args.clusterMetadataManager,
130127
clusterMetadata: args.clusterMetadata,
131128
clientFactory: args.clientFactory,
@@ -280,18 +277,22 @@ func (h *OperatorHandlerImpl) addSearchAttributesSQL(
280277
if nsName == "" {
281278
return errNamespaceNotSet
282279
}
283-
ns, err := h.namespaceRegistry.GetNamespace(namespace.Name(nsName))
280+
resp, err := client.DescribeNamespace(
281+
ctx,
282+
&workflowservice.DescribeNamespaceRequest{Namespace: nsName},
283+
)
284284
if err != nil {
285285
return serviceerror.NewUnavailable(fmt.Sprintf(errUnableToGetNamespaceInfoMessage, nsName))
286286
}
287287

288288
dbCustomSearchAttributes := searchattribute.GetSqlDbIndexSearchAttributes().CustomSearchAttributes
289289
cmCustomSearchAttributes := currentSearchAttributes.Custom()
290-
mapper := ns.CustomSearchAttributesMapper()
291-
fieldToAliasMap := util.CloneMapNonNil(mapper.FieldToAliasMap())
290+
upsertFieldToAliasMap := make(map[string]string)
291+
fieldToAliasMap := resp.Config.CustomSearchAttributeAliases
292+
aliasToFieldMap := util.InverseMap(fieldToAliasMap)
292293
for saName, saType := range request.GetSearchAttributes() {
293294
// check if alias is already in use
294-
if _, err := mapper.GetFieldName(saName, nsName); err == nil {
295+
if _, ok := aliasToFieldMap[saName]; ok {
295296
return serviceerror.NewAlreadyExist(
296297
fmt.Sprintf(errSearchAttributeAlreadyExistsMessage, saName),
297298
)
@@ -318,15 +319,18 @@ func (h *OperatorHandlerImpl) addSearchAttributesSQL(
318319
fmt.Sprintf(errTooManySearchAttributesMessage, cntUsed, saType.String()),
319320
)
320321
}
321-
fieldToAliasMap[targetFieldName] = saName
322+
upsertFieldToAliasMap[targetFieldName] = saName
322323
}
323324

324325
_, err = client.UpdateNamespace(ctx, &workflowservice.UpdateNamespaceRequest{
325326
Namespace: nsName,
326327
Config: &namespacepb.NamespaceConfig{
327-
CustomSearchAttributeAliases: fieldToAliasMap,
328+
CustomSearchAttributeAliases: upsertFieldToAliasMap,
328329
},
329330
})
331+
if err.Error() == errCustomSearchAttributeFieldAlreadyAllocated.Error() {
332+
return errRaceConditionAddingSearchAttributes
333+
}
330334
return err
331335
}
332336

@@ -410,25 +414,28 @@ func (h *OperatorHandlerImpl) removeSearchAttributesSQL(
410414
if nsName == "" {
411415
return errNamespaceNotSet
412416
}
413-
ns, err := h.namespaceRegistry.GetNamespace(namespace.Name(nsName))
417+
resp, err := client.DescribeNamespace(
418+
ctx,
419+
&workflowservice.DescribeNamespaceRequest{Namespace: nsName},
420+
)
414421
if err != nil {
415422
return serviceerror.NewUnavailable(fmt.Sprintf(errUnableToGetNamespaceInfoMessage, nsName))
416423
}
417424

418-
mapper := ns.CustomSearchAttributesMapper()
419-
fieldToAliasMap := maps.Clone(mapper.FieldToAliasMap())
425+
upsertFieldToAliasMap := make(map[string]string)
426+
aliasToFieldMap := util.InverseMap(resp.Config.CustomSearchAttributeAliases)
420427
for _, saName := range request.GetSearchAttributes() {
421-
fieldName, err := mapper.GetFieldName(saName, nsName)
422-
if err != nil {
428+
fieldName, ok := aliasToFieldMap[saName]
429+
if !ok {
423430
return serviceerror.NewNotFound(fmt.Sprintf(errSearchAttributeDoesntExistMessage, saName))
424431
}
425-
delete(fieldToAliasMap, fieldName)
432+
upsertFieldToAliasMap[fieldName] = ""
426433
}
427434

428435
_, err = client.UpdateNamespace(ctx, &workflowservice.UpdateNamespaceRequest{
429436
Namespace: nsName,
430437
Config: &namespacepb.NamespaceConfig{
431-
CustomSearchAttributeAliases: fieldToAliasMap,
438+
CustomSearchAttributeAliases: upsertFieldToAliasMap,
432439
},
433440
})
434441
return err
@@ -460,7 +467,7 @@ func (h *OperatorHandlerImpl) ListSearchAttributes(
460467
if h.visibilityMgr.HasStoreName(elasticsearch.PersistenceName) || indexName == "" {
461468
return h.listSearchAttributesElasticsearch(ctx, indexName, searchAttributes)
462469
}
463-
return h.listSearchAttributesSQL(request, searchAttributes)
470+
return h.listSearchAttributesSQL(ctx, request, searchAttributes)
464471
}
465472

466473
func (h *OperatorHandlerImpl) listSearchAttributesElasticsearch(
@@ -486,23 +493,36 @@ func (h *OperatorHandlerImpl) listSearchAttributesElasticsearch(
486493
}
487494

488495
func (h *OperatorHandlerImpl) listSearchAttributesSQL(
496+
ctx context.Context,
489497
request *operatorservice.ListSearchAttributesRequest,
490498
searchAttributes searchattribute.NameTypeMap,
491499
) (*operatorservice.ListSearchAttributesResponse, error) {
500+
_, client, err := h.clientFactory.NewLocalFrontendClientWithTimeout(
501+
frontend.DefaultTimeout,
502+
frontend.DefaultLongPollTimeout,
503+
)
504+
if err != nil {
505+
return nil, serviceerror.NewUnavailable(fmt.Sprintf(errUnableToCreateFrontendClientMessage, err))
506+
}
507+
492508
nsName := request.GetNamespace()
493509
if nsName == "" {
494510
return nil, errNamespaceNotSet
495511
}
496-
ns, err := h.namespaceRegistry.GetNamespace(namespace.Name(nsName))
512+
resp, err := client.DescribeNamespace(
513+
ctx,
514+
&workflowservice.DescribeNamespaceRequest{Namespace: nsName},
515+
)
497516
if err != nil {
498517
return nil, serviceerror.NewUnavailable(
499518
fmt.Sprintf(errUnableToGetNamespaceInfoMessage, nsName),
500519
)
501520
}
502-
mapper := ns.CustomSearchAttributesMapper()
521+
522+
fieldToAliasMap := resp.Config.CustomSearchAttributeAliases
503523
customSearchAttributes := make(map[string]enumspb.IndexedValueType)
504524
for field, tp := range searchAttributes.Custom() {
505-
if alias, err := mapper.GetAlias(field, nsName); err == nil {
525+
if alias, ok := fieldToAliasMap[field]; ok {
506526
customSearchAttributes[alias] = tp
507527
}
508528
}

service/frontend/operator_handler_test.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,6 @@ func (s *operatorHandlerSuite) SetupTest() {
8888
s.mockResource.GetSearchAttributesManager(),
8989
health.NewServer(),
9090
s.mockResource.GetHistoryClient(),
91-
s.mockResource.GetNamespaceRegistry(),
9291
s.mockResource.GetClusterMetadataManager(),
9392
s.mockResource.GetClusterMetadata(),
9493
s.mockResource.GetClientFactory(),

0 commit comments

Comments
 (0)