Skip to content

Commit 9e61426

Browse files
committed
refactor: update aws sdk , handling contexts, cancelling operations and all test passed
1 parent 279e449 commit 9e61426

File tree

13 files changed

+512
-197
lines changed

13 files changed

+512
-197
lines changed

cmd/sns-sqs/main.go

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,32 +2,39 @@ package main
22

33
import (
44
"context"
5+
"os"
56
"time"
67

8+
"github.com/ThreeDotsLabs/watermill-amazonsqs/connection"
79
"github.com/ThreeDotsLabs/watermill-amazonsqs/sqs"
10+
awsconfig "github.com/aws/aws-sdk-go-v2/config"
811

912
"github.com/ThreeDotsLabs/watermill"
1013
"github.com/ThreeDotsLabs/watermill/message"
11-
"github.com/aws/aws-sdk-go/aws"
1214

1315
"github.com/ThreeDotsLabs/watermill-amazonsqs/sns"
1416
)
1517

1618
func main() {
1719
logger := watermill.NewStdLogger(true, true)
1820

19-
cfg := aws.Config{
20-
Region: aws.String("eu-north-1"),
21+
cfg, err := awsconfig.LoadDefaultConfig(
22+
context.Background(),
23+
connection.SetEndPoint(os.Getenv("AWS_SNS_ENDPOINT")),
24+
)
25+
if err != nil {
26+
panic(err)
2127
}
2228

2329
pub, err := sns.NewPublisher(sns.PublisherConfig{
24-
AWSConfig: cfg,
30+
AWSConfig: cfg,
31+
CreateTopicfNotExists: true,
2532
}, logger)
2633
if err != nil {
2734
panic(err)
2835
}
2936

30-
sub, err := sqs.NewSubsciber(sqs.SubscriberConfig{
37+
sub, err := sqs.NewSubscriber(sqs.SubscriberConfig{
3138
AWSConfig: cfg,
3239
}, logger)
3340
if err != nil {

cmd/sqs-sqs/main.go

Lines changed: 24 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -2,39 +2,51 @@ package main
22

33
import (
44
"context"
5+
"os"
56
"time"
67

78
"github.com/ThreeDotsLabs/watermill"
89
"github.com/ThreeDotsLabs/watermill/message"
9-
"github.com/aws/aws-sdk-go/aws"
10+
awsconfig "github.com/aws/aws-sdk-go-v2/config"
1011

12+
"github.com/ThreeDotsLabs/watermill-amazonsqs/connection"
1113
"github.com/ThreeDotsLabs/watermill-amazonsqs/sqs"
1214
)
1315

1416
func main() {
17+
ctx := context.Background()
1518
logger := watermill.NewStdLogger(true, true)
16-
17-
cfg := aws.Config{
18-
Region: aws.String("eu-north-1"),
19+
cfg, err := awsconfig.LoadDefaultConfig(
20+
context.Background(),
21+
awsconfig.WithRegion("us-west-2"),
22+
connection.SetEndPoint(os.Getenv("AWS_SNS_ENDPOINT")),
23+
)
24+
if err != nil {
25+
panic(err)
1926
}
20-
21-
pub, err := sqs.NewPublisher(sqs.PublisherConfig{
22-
AWSConfig: cfg,
23-
Marshaler: sqs.DefaultMarshalerUnmarshaler{},
27+
pub, err := sqs.NewPublisher(ctx, sqs.PublisherConfig{
28+
AWSConfig: cfg,
29+
CreateQueueIfNotExists: true,
30+
Marshaler: sqs.DefaultMarshalerUnmarshaler{},
2431
}, logger)
2532
if err != nil {
2633
panic(err)
2734
}
35+
_ = pub
2836

29-
sub, err := sqs.NewSubsciber(sqs.SubscriberConfig{
30-
AWSConfig: cfg,
31-
Unmarshaler: sqs.DefaultMarshalerUnmarshaler{},
37+
sub, err := sqs.NewSubscriber(sqs.SubscriberConfig{
38+
AWSConfig: cfg,
39+
CreateQueueInitializerConfig: sqs.QueueConfigAtrributes{},
40+
Unmarshaler: sqs.DefaultMarshalerUnmarshaler{},
3241
}, logger)
3342
if err != nil {
3443
panic(err)
3544
}
3645

37-
ctx := context.Background()
46+
err = sub.SubscribeInitialize("any-topic")
47+
if err != nil {
48+
panic(err)
49+
}
3850

3951
messages, err := sub.Subscribe(ctx, "any-topic")
4052
if err != nil {

connection/endpoint.go

Lines changed: 14 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,20 @@
11
package connection
22

33
import (
4-
"os"
5-
6-
"github.com/aws/aws-sdk-go/aws"
4+
"github.com/aws/aws-sdk-go-v2/aws"
5+
"github.com/aws/aws-sdk-go-v2/config"
76
)
87

9-
const AWS_ENDPOINT = "AWS_ENDPOINT"
10-
11-
func SetEndPoint(config aws.Config) aws.Config {
12-
newConfig := config
13-
awsEndpoint := os.Getenv(AWS_ENDPOINT)
14-
if awsEndpoint != "" {
15-
newConfig.Endpoint = aws.String(awsEndpoint)
16-
}
17-
return newConfig
8+
func SetEndPoint(endpoint string) config.LoadOptionsFunc {
9+
return config.WithEndpointResolverWithOptions(aws.EndpointResolverWithOptionsFunc(func(service, region string, options ...interface{}) (aws.Endpoint, error) {
10+
if endpoint != "" {
11+
return aws.Endpoint{
12+
PartitionID: "aws",
13+
URL: endpoint,
14+
SigningRegion: region,
15+
}, nil
16+
}
17+
// returning EndpointNotFoundError will allow the service to fallback to it's default resolution
18+
return aws.Endpoint{}, &aws.EndpointNotFoundError{}
19+
}))
1820
}

go.mod

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,11 @@ go 1.12
44

55
require (
66
github.com/ThreeDotsLabs/watermill v1.2.0
7-
github.com/aws/aws-sdk-go v1.25.11
7+
github.com/aws/aws-sdk-go-v2 v1.18.0
8+
github.com/aws/aws-sdk-go-v2/config v1.18.25
9+
github.com/aws/aws-sdk-go-v2/credentials v1.13.24
10+
github.com/aws/aws-sdk-go-v2/service/sns v1.20.11
11+
github.com/aws/aws-sdk-go-v2/service/sqs v1.22.0
12+
github.com/hashicorp/go-multierror v1.1.1
813
github.com/stretchr/testify v1.8.1
914
)

go.sum

Lines changed: 35 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,34 @@ github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRF
4343
github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho=
4444
github.com/aws/aws-sdk-go v1.25.11 h1:wUivbsVOH3LpHdC3Rl5i+FLHfg4sOmYgv4bvHe7+/Pg=
4545
github.com/aws/aws-sdk-go v1.25.11/go.mod h1:KmX6BPdI08NWTb3/sm4ZGu5ShLoqVDhKgpiN924inxo=
46+
github.com/aws/aws-sdk-go-v2 v1.18.0 h1:882kkTpSFhdgYRKVZ/VCgf7sd0ru57p2JCxz4/oN5RY=
47+
github.com/aws/aws-sdk-go-v2 v1.18.0/go.mod h1:uzbQtefpm44goOPmdKyAlXSNcwlRgF3ePWVW6EtJvvw=
48+
github.com/aws/aws-sdk-go-v2/config v1.18.25 h1:JuYyZcnMPBiFqn87L2cRppo+rNwgah6YwD3VuyvaW6Q=
49+
github.com/aws/aws-sdk-go-v2/config v1.18.25/go.mod h1:dZnYpD5wTW/dQF0rRNLVypB396zWCcPiBIvdvSWHEg4=
50+
github.com/aws/aws-sdk-go-v2/credentials v1.13.24 h1:PjiYyls3QdCrzqUN35jMWtUK1vqVZ+zLfdOa/UPFDp0=
51+
github.com/aws/aws-sdk-go-v2/credentials v1.13.24/go.mod h1:jYPYi99wUOPIFi0rhiOvXeSEReVOzBqFNOX5bXYoG2o=
52+
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.13.3 h1:jJPgroehGvjrde3XufFIJUZVK5A2L9a3KwSFgKy9n8w=
53+
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.13.3/go.mod h1:4Q0UFP0YJf0NrsEuEYHpM9fTSEVnD16Z3uyEF7J9JGM=
54+
github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.33 h1:kG5eQilShqmJbv11XL1VpyDbaEJzWxd4zRiCG30GSn4=
55+
github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.33/go.mod h1:7i0PF1ME/2eUPFcjkVIwq+DOygHEoK92t5cDqNgYbIw=
56+
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.27 h1:vFQlirhuM8lLlpI7imKOMsjdQLuN9CPi+k44F/OFVsk=
57+
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.27/go.mod h1:UrHnn3QV/d0pBZ6QBAEQcqFLf8FAzLmoUfPVIueOvoM=
58+
github.com/aws/aws-sdk-go-v2/internal/ini v1.3.34 h1:gGLG7yKaXG02/jBlg210R7VgQIotiQntNhsCFejawx8=
59+
github.com/aws/aws-sdk-go-v2/internal/ini v1.3.34/go.mod h1:Etz2dj6UHYuw+Xw830KfzCfWGMzqvUTCjUj5b76GVDc=
60+
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.9.27 h1:0iKliEXAcCa2qVtRs7Ot5hItA2MsufrphbRFlz1Owxo=
61+
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.9.27/go.mod h1:EOwBD4J4S5qYszS5/3DpkejfuK+Z5/1uzICfPaZLtqw=
62+
github.com/aws/aws-sdk-go-v2/service/sns v1.20.11 h1:kUKAkuOhCCq/Av372Dtzg0oaAD5VEUYdDtU4lGIYKkw=
63+
github.com/aws/aws-sdk-go-v2/service/sns v1.20.11/go.mod h1:WjBcrd28zNbbuAcIRO/n89sSeOxTuOZPiuxNXU/2WrI=
64+
github.com/aws/aws-sdk-go-v2/service/sqs v1.22.0 h1:ikSvot5NdywduxtkOwOa2GJFzFuJq1ZjXsGjoIA82Ao=
65+
github.com/aws/aws-sdk-go-v2/service/sqs v1.22.0/go.mod h1:ujUjm+PrcKUeIiKu2PT7MWjcyY0D6YZRZF3fSswiO+0=
66+
github.com/aws/aws-sdk-go-v2/service/sso v1.12.10 h1:UBQjaMTCKwyUYwiVnUt6toEJwGXsLBI6al083tpjJzY=
67+
github.com/aws/aws-sdk-go-v2/service/sso v1.12.10/go.mod h1:ouy2P4z6sJN70fR3ka3wD3Ro3KezSxU6eKGQI2+2fjI=
68+
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.14.10 h1:PkHIIJs8qvq0e5QybnZoG1K/9QTrLr9OsqCIo59jOBA=
69+
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.14.10/go.mod h1:AFvkxc8xfBe8XA+5St5XIHHrQQtkxqrRincx4hmMHOk=
70+
github.com/aws/aws-sdk-go-v2/service/sts v1.19.0 h1:2DQLAKDteoEDI8zpCzqBMaZlJuoE9iTYD0gFmXVax9E=
71+
github.com/aws/aws-sdk-go-v2/service/sts v1.19.0/go.mod h1:BgQOMsg8av8jset59jelyPW7NoZcZXLVpDsXunGDrk8=
72+
github.com/aws/smithy-go v1.13.5 h1:hgz0X/DX0dGqTYpGALqXJoRKRj5oQ7150i5FdTePzO8=
73+
github.com/aws/smithy-go v1.13.5/go.mod h1:Tg+OJXh4MB2R/uN61Ko2f6hTZwB/ZYGOtib8J3gBHzA=
4674
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
4775
github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8=
4876
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
@@ -119,6 +147,7 @@ github.com/google/go-cmp v0.5.1/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/
119147
github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
120148
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
121149
github.com/google/go-cmp v0.5.8/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
150+
github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38=
122151
github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
123152
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
124153
github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXiIaQGbYVAs8BPL6v8lEs=
@@ -144,8 +173,11 @@ github.com/hashicorp/go-multierror v1.1.1/go.mod h1:iw975J/qwKPdAO1clOe2L8331t/9
144173
github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
145174
github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
146175
github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc=
147-
github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af h1:pmfjZENx5imkbgOkpRUYLnmbU7UEFbjtDA2hxJ1ichM=
148176
github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af/go.mod h1:Nht3zPeWKUH0NzdCt2Blrr5ys8VGpn0CEB0cQHVjt7k=
177+
github.com/jmespath/go-jmespath v0.4.0 h1:BEgLn5cpjn8UN1mAw4NjwDrS35OdebyEtFe+9YPoQUg=
178+
github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo=
179+
github.com/jmespath/go-jmespath/internal/testify v1.5.1 h1:shLQSRRSCCPj3f2gpwzGwWFoC7ycTf1rcQZHOlsJ6N8=
180+
github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfCI6z80xFu9LTZmf1ZRjMHUOPmWr69U=
149181
github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX5e0EB2j4=
150182
github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU=
151183
github.com/json-iterator/go v1.1.10/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4=
@@ -529,7 +561,9 @@ gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
529561
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
530562
gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
531563
gopkg.in/yaml.v2 v2.2.5/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
564+
gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
532565
gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
566+
gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY=
533567
gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ=
534568
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
535569
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=

sns/pub_test.go

Lines changed: 22 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,40 @@
11
package sns
22

33
import (
4+
"context"
5+
"os"
46
"testing"
57

6-
"github.com/aws/aws-sdk-go/aws"
8+
awsconfig "github.com/aws/aws-sdk-go-v2/config"
9+
10+
"github.com/aws/aws-sdk-go-v2/aws"
11+
"github.com/aws/aws-sdk-go-v2/credentials"
712
"github.com/stretchr/testify/require"
813

914
"github.com/ThreeDotsLabs/watermill"
15+
"github.com/ThreeDotsLabs/watermill-amazonsqs/connection"
1016
)
1117

1218
func TestCreatePub(t *testing.T) {
1319
logger := watermill.NewStdLogger(true, true)
1420

15-
cfg := aws.Config{
16-
Region: aws.String("eu-north-1"),
17-
Endpoint: aws.String("http://localhost:9324"),
18-
}
21+
cfg, err := awsconfig.LoadDefaultConfig(
22+
context.Background(),
23+
awsconfig.WithRegion("us-west-2"),
24+
awsconfig.WithCredentialsProvider(credentials.StaticCredentialsProvider{
25+
Value: aws.Credentials{
26+
AccessKeyID: "test",
27+
SecretAccessKey: "test",
28+
},
29+
}),
30+
connection.SetEndPoint(os.Getenv("AWS_SNS_ENDPOINT")),
31+
)
32+
33+
require.NoError(t, err)
1934

20-
_, err := NewPublisher(PublisherConfig{
35+
_, err = NewPublisher(PublisherConfig{
2136
AWSConfig: cfg,
2237
}, logger)
38+
2339
require.NoError(t, err)
2440
}

sns/publisher.go

Lines changed: 53 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,55 +1,57 @@
11
package sns
22

33
import (
4-
"github.com/aws/aws-sdk-go/aws"
5-
"github.com/aws/aws-sdk-go/aws/session"
6-
"github.com/aws/aws-sdk-go/service/sns"
4+
"context"
5+
6+
"github.com/aws/aws-sdk-go-v2/aws"
7+
"github.com/aws/aws-sdk-go-v2/service/sns"
8+
"github.com/aws/aws-sdk-go-v2/service/sns/types"
79

810
"github.com/ThreeDotsLabs/watermill"
911
"github.com/ThreeDotsLabs/watermill/message"
10-
11-
"github.com/ThreeDotsLabs/watermill-amazonsqs/connection"
1212
)
1313

1414
type PublisherConfig struct {
15-
AWSConfig aws.Config
15+
AWSConfig aws.Config
16+
CreateTopicConfig SNSConfigAtrributes
17+
CreateTopicfNotExists bool
1618
}
1719

1820
type Publisher struct {
1921
config PublisherConfig
2022
logger watermill.LoggerAdapter
21-
sns *sns.SNS
23+
sns *sns.Client
2224
}
2325

2426
func NewPublisher(config PublisherConfig, logger watermill.LoggerAdapter) (*Publisher, error) {
25-
config.AWSConfig = connection.SetEndPoint(config.AWSConfig)
26-
sess, err := session.NewSession(&config.AWSConfig)
27-
if err != nil {
28-
// TODO wrap
29-
return nil, err
30-
}
31-
3227
return &Publisher{
33-
sns: sns.New(sess),
28+
sns: sns.NewFromConfig(config.AWSConfig),
3429
config: config,
3530
logger: logger,
3631
}, nil
3732
}
3833

3934
func (p Publisher) Publish(topic string, messages ...*message.Message) error {
40-
// TODO method for generating
41-
topicInfo, err := p.sns.CreateTopic(&sns.CreateTopicInput{
42-
Name: aws.String(topic),
43-
})
35+
ctx := context.Background()
36+
topicArn, err := p.GetArnTopic(ctx, topic)
4437
if err != nil {
4538
return err
4639
}
4740

4841
for _, msg := range messages {
4942
p.logger.Debug("Sending message", watermill.LogFields{"msg": msg})
50-
_, err = p.sns.Publish(&sns.PublishInput{
51-
TopicArn: topicInfo.TopicArn,
52-
Message: aws.String(string(msg.Payload)),
43+
// Real messageId are generated on server side
44+
// so we can set our own here so we can use it in the tests
45+
// There is a deduplicationId but just for FIFO queues
46+
attributes := metadataToAttributes(msg.Metadata)
47+
attributes["UUID"] = types.MessageAttributeValue{
48+
StringValue: aws.String(msg.UUID),
49+
DataType: aws.String("String"),
50+
}
51+
_, err = p.sns.Publish(ctx, &sns.PublishInput{
52+
TopicArn: topicArn,
53+
Message: aws.String(string(msg.Payload)),
54+
MessageAttributes: attributes,
5355
})
5456
if err != nil {
5557
return err
@@ -59,6 +61,35 @@ func (p Publisher) Publish(topic string, messages ...*message.Message) error {
5961
return nil
6062
}
6163

64+
func (p Publisher) GetArnTopic(ctx context.Context, topic string) (*string, error) {
65+
topicARN, err := CheckARNTopic(ctx, p.sns, topic)
66+
if err != nil {
67+
if p.config.CreateTopicfNotExists {
68+
topicARN, err = CreateSNS(ctx, p.sns, topic, sns.CreateTopicInput{
69+
Attributes: p.config.CreateTopicConfig.Attributes(),
70+
})
71+
if err == nil {
72+
return topicARN, nil
73+
}
74+
}
75+
return nil, err
76+
}
77+
return topicARN, nil
78+
}
79+
6280
func (p Publisher) Close() error {
6381
return nil
6482
}
83+
84+
func metadataToAttributes(meta message.Metadata) map[string]types.MessageAttributeValue {
85+
attributes := make(map[string]types.MessageAttributeValue)
86+
87+
for k, v := range meta {
88+
attributes[k] = types.MessageAttributeValue{
89+
StringValue: aws.String(v),
90+
DataType: aws.String("String"),
91+
}
92+
}
93+
94+
return attributes
95+
}

0 commit comments

Comments
 (0)