Skip to content

Commit 5790fc4

Browse files
guyinyouguyinyou
andauthored
golang: Add namespace in Resource and metadata (#753)
* metadata和resource支持namespace字段 * add ut --------- Co-authored-by: guyinyou <guyinyou.gyy@alibaba-inc.com>
1 parent 78a347e commit 5790fc4

9 files changed

Lines changed: 78 additions & 23 deletions

golang/client.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -377,7 +377,8 @@ func (cli *defaultClient) queryRoute(ctx context.Context, topic string, duration
377377
func (cli *defaultClient) getQueryRouteRequest(topic string) *v2.QueryRouteRequest {
378378
return &v2.QueryRouteRequest{
379379
Topic: &v2.Resource{
380-
Name: topic,
380+
Name: topic,
381+
ResourceNamespace: cli.config.NameSpace,
381382
},
382383
Endpoints: cli.accessPoint,
383384
}
@@ -599,6 +600,8 @@ func (cli *defaultClient) Sign(ctx context.Context) context.Context {
599600
innerMD.VersionValue,
600601
innerMD.ClintID,
601602
cli.clientID,
603+
innerMD.NameSpace,
604+
cli.config.NameSpace,
602605
innerMD.DateTime,
603606
now,
604607
innerMD.Authorization,

golang/client_test.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -298,7 +298,8 @@ func TestRestoreDefaultClientSessionTwoErrors(t *testing.T) {
298298
func Test_routeEqual(t *testing.T) {
299299
oldMq := &v2.MessageQueue{
300300
Topic: &v2.Resource{
301-
Name: "topic-test",
301+
Name: "topic-test",
302+
ResourceNamespace: "ns-test",
302303
},
303304
Id: 0,
304305
Permission: v2.Permission_READ_WRITE,
@@ -313,7 +314,8 @@ func Test_routeEqual(t *testing.T) {
313314
}
314315
newMq := &v2.MessageQueue{
315316
Topic: &v2.Resource{
316-
Name: "topic-test",
317+
Name: "topic-test",
318+
ResourceNamespace: "ns-test",
317319
},
318320
Id: 0,
319321
Permission: v2.Permission_READ_WRITE,

golang/metadata/metadata.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,11 @@
1818
package metadata
1919

2020
const (
21-
LanguageKey = "x-mq-language"
22-
ProtocolKey = "x-mq-protocol"
23-
RequestID = "x-mq-request-id"
24-
VersionKey = "x-mq-client-version"
25-
// NameSpace = "x-mq-namespace"
21+
LanguageKey = "x-mq-language"
22+
ProtocolKey = "x-mq-protocol"
23+
RequestID = "x-mq-request-id"
24+
VersionKey = "x-mq-client-version"
25+
NameSpace = "x-mq-namespace"
2626
DateTime = "x-mq-date-time"
2727
ClintID = "x-mq-client-id"
2828
Authorization = "authorization"

golang/producer.go

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -155,7 +155,8 @@ var NewProducer = func(config *Config, opts ...ProducerOption) (Producer, error)
155155
}
156156
for _, topic := range po.topics {
157157
topicResource := &v2.Resource{
158-
Name: topic,
158+
Name: topic,
159+
ResourceNamespace: config.NameSpace,
159160
}
160161
p.pSetting.topics.Store(topic, topicResource)
161162
}
@@ -287,7 +288,7 @@ func (p *defaultProducer) send0(ctx context.Context, msgs []*UnifiedMessage, txE
287288
var err error
288289
pubMessage = uMsg.pubMsg
289290
if uMsg.pubMsg == nil {
290-
pubMessage, err = NewPublishingMessage(msg, p.pSetting, txEnabled)
291+
pubMessage, err = NewPublishingMessage(msg, p.cli.config.NameSpace, p.pSetting, txEnabled)
291292
if err != nil {
292293
return nil, err
293294
}
@@ -315,7 +316,8 @@ func (p *defaultProducer) send0(ctx context.Context, msgs []*UnifiedMessage, txE
315316
}
316317
if _, ok := p.pSetting.topics.Load(topicName); !ok {
317318
p.pSetting.topics.Store(topicName, &v2.Resource{
318-
Name: topicName,
319+
Name: topicName,
320+
ResourceNamespace: p.cli.config.NameSpace,
319321
})
320322
}
321323
pubLoadBalancer, err := p.getPublishingTopicRouteResult(ctx, topicName)
@@ -362,7 +364,7 @@ func (p *defaultProducer) SendWithTransaction(ctx context.Context, msg *Message,
362364
return nil, fmt.Errorf("producer is not running")
363365
}
364366
t := transaction.(*transactionImpl)
365-
pubMessage, err := t.tryAddMessage(msg)
367+
pubMessage, err := t.tryAddMessage(msg, p.cli.config.NameSpace)
366368
if err != nil {
367369
return nil, err
368370
}
@@ -394,7 +396,8 @@ func (p *defaultProducer) endTransaction(ctx context.Context, endpoints *v2.Endp
394396
ctx = p.cli.Sign(ctx)
395397
request := &v2.EndTransactionRequest{
396398
Topic: &v2.Resource{
397-
Name: messageCommon.topic,
399+
Name: messageCommon.topic,
400+
ResourceNamespace: p.cli.config.NameSpace,
398401
},
399402
MessageId: messageId,
400403
TransactionId: transactionId,

golang/publishing_message.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,14 +26,15 @@ import (
2626
)
2727

2828
type PublishingMessage struct {
29+
namespace string
2930
msg *Message
3031
encoding v2.Encoding
3132
messageId string
3233
messageType v2.MessageType
3334
traceContext *string
3435
}
3536

36-
var NewPublishingMessage = func(msg *Message, settings *producerSettings, txEnabled bool) (*PublishingMessage, error) {
37+
var NewPublishingMessage = func(msg *Message, namespace string, settings *producerSettings, txEnabled bool) (*PublishingMessage, error) {
3738
if msg == nil {
3839
return nil, fmt.Errorf("message is nil")
3940
}
@@ -51,6 +52,8 @@ var NewPublishingMessage = func(msg *Message, settings *producerSettings, txEnab
5152
// No need to compress message body.
5253
pMsg.encoding = v2.Encoding_IDENTITY
5354

55+
pMsg.namespace = namespace
56+
5457
// Generate message id.
5558
pMsg.messageId = GetMessageIdCodecInstance().NextMessageId().String()
5659
// Normal message.
@@ -84,7 +87,8 @@ func (pMsg *PublishingMessage) toProtobuf() (*v2.Message, error) {
8487
msg := &v2.Message{
8588
Topic: &v2.Resource{
8689
// ResourceNamespace: b.conn.Config().NameSpace,
87-
Name: pMsg.msg.Topic,
90+
Name: pMsg.msg.Topic,
91+
ResourceNamespace: pMsg.namespace,
8892
},
8993
SystemProperties: &v2.SystemProperties{
9094
Keys: pMsg.msg.GetKeys(),

golang/publishing_message_test.go

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package golang
19+
20+
import "testing"
21+
22+
func TestNewPublishingMessage(t *testing.T) {
23+
namespace := "ns-test"
24+
pSetting := &producerSettings{}
25+
msg := &Message{}
26+
pMsg, err := NewPublishingMessage(msg, namespace, pSetting, false)
27+
if err != nil {
28+
t.Error(err)
29+
}
30+
v2Msg, err := pMsg.toProtobuf()
31+
if err != nil {
32+
t.Error(err)
33+
}
34+
if v2Msg.GetTopic().GetResourceNamespace() != namespace {
35+
t.Error("namespace not equal")
36+
}
37+
}

golang/simple_consumer.go

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -75,10 +75,12 @@ func (sc *defaultSimpleConsumer) changeInvisibleDuration0(messageView *MessageVi
7575
ctx := sc.cli.Sign(context.Background())
7676
request := &v2.ChangeInvisibleDurationRequest{
7777
Topic: &v2.Resource{
78-
Name: messageView.GetTopic(),
78+
Name: messageView.GetTopic(),
79+
ResourceNamespace: sc.cli.config.NameSpace,
7980
},
8081
Group: &v2.Resource{
81-
Name: sc.groupName,
82+
Name: sc.groupName,
83+
ResourceNamespace: sc.cli.config.NameSpace,
8284
},
8385
ReceiptHandle: messageView.GetReceiptHandle(),
8486
InvisibleDuration: durationpb.New(invisibleDuration),
@@ -166,7 +168,8 @@ func (sc *defaultSimpleConsumer) wrapReceiveMessageRequest(batchSize int, messag
166168

167169
return &v2.ReceiveMessageRequest{
168170
Group: &v2.Resource{
169-
Name: sc.groupName,
171+
Name: sc.groupName,
172+
ResourceNamespace: sc.cli.config.NameSpace,
170173
},
171174
MessageQueue: messageQueue,
172175
FilterExpression: &v2.FilterExpression{
@@ -183,7 +186,8 @@ func (sc *defaultSimpleConsumer) wrapAckMessageRequest(messageView *MessageView)
183186
return &v2.AckMessageRequest{
184187
Group: sc.scSettings.groupName,
185188
Topic: &v2.Resource{
186-
Name: messageView.GetTopic(),
189+
Name: messageView.GetTopic(),
190+
ResourceNamespace: sc.cli.config.NameSpace,
187191
},
188192
Entries: []*v2.AckMessageEntry{
189193
{
@@ -369,7 +373,8 @@ var NewSimpleConsumer = func(config *Config, opts ...SimpleConsumerOption) (Simp
369373
requestTimeout: sc.cli.opts.timeout,
370374

371375
groupName: &v2.Resource{
372-
Name: sc.groupName,
376+
Name: sc.groupName,
377+
ResourceNamespace: config.NameSpace,
373378
},
374379
longPollingTimeout: scOpts.awaitDuration,
375380
subscriptionExpressions: scOpts.subscriptionExpressions,

golang/simple_consumer_options.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -156,7 +156,8 @@ func (sc *simpleConsumerSettings) toProtobuf() *v2.Settings {
156156
subscriptions := make([]*v2.SubscriptionEntry, 0)
157157
for k, v := range sc.subscriptionExpressions {
158158
topic := &v2.Resource{
159-
Name: k,
159+
Name: k,
160+
ResourceNamespace: sc.groupName.GetResourceNamespace(),
160161
}
161162
filterExpression := &v2.FilterExpression{
162163
Expression: v.expression,

golang/transaction.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ func (t *transactionImpl) RollBack() error {
8888
return nil
8989
}
9090

91-
func (t *transactionImpl) tryAddMessage(message *Message) (*PublishingMessage, error) {
91+
func (t *transactionImpl) tryAddMessage(message *Message, namespace string) (*PublishingMessage, error) {
9292
t.messagesLock.RLock()
9393
if len(t.messages) > MAX_MESSAGE_NUM {
9494
return nil, fmt.Errorf("message in transaction has exceeded the threshold: %d", MAX_MESSAGE_NUM)
@@ -100,7 +100,7 @@ func (t *transactionImpl) tryAddMessage(message *Message) (*PublishingMessage, e
100100
if len(t.messages) > MAX_MESSAGE_NUM {
101101
return nil, fmt.Errorf("message in transaction has exceeded the threshold: %d", MAX_MESSAGE_NUM)
102102
}
103-
pubMessage, err := NewPublishingMessage(message, t.producerImpl.(*defaultProducer).pSetting, true)
103+
pubMessage, err := NewPublishingMessage(message, namespace, t.producerImpl.(*defaultProducer).pSetting, true)
104104
if err != nil {
105105
return nil, err
106106
}

0 commit comments

Comments
 (0)