Skip to content

Commit 87863ef

Browse files
authored
Merge pull request #14 from Mariscal6/refactor/sqs-sns
Refactor SQS and SNS
2 parents 279e449 + 58269f6 commit 87863ef

File tree

15 files changed

+716
-210
lines changed

15 files changed

+716
-210
lines changed

.gitignore

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1 +1,3 @@
11
.idea/
2+
.vscode/
3+
.DS_Store

cmd/sns-sqs/main.go

Lines changed: 44 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -2,32 +2,45 @@ package main
22

33
import (
44
"context"
5+
"os"
56
"time"
67

8+
"github.com/ThreeDotsLabs/watermill-amazonsqs/connection"
79
"github.com/ThreeDotsLabs/watermill-amazonsqs/sqs"
10+
"github.com/aws/aws-sdk-go-v2/aws"
11+
awsconfig "github.com/aws/aws-sdk-go-v2/config"
12+
awssns "github.com/aws/aws-sdk-go-v2/service/sns"
813

914
"github.com/ThreeDotsLabs/watermill"
1015
"github.com/ThreeDotsLabs/watermill/message"
11-
"github.com/aws/aws-sdk-go/aws"
1216

1317
"github.com/ThreeDotsLabs/watermill-amazonsqs/sns"
1418
)
1519

20+
const SNS_TOPIC = "local-topic1"
21+
const SQS_QUEUE = "local-queue4"
22+
1623
func main() {
1724
logger := watermill.NewStdLogger(true, true)
1825

19-
cfg := aws.Config{
20-
Region: aws.String("eu-north-1"),
26+
cfg, err := awsconfig.LoadDefaultConfig(
27+
context.Background(),
28+
awsconfig.WithRegion("eu-north-1"),
29+
connection.SetEndPoint(os.Getenv("AWS_SNS_ENDPOINT")),
30+
)
31+
if err != nil {
32+
panic(err)
2133
}
2234

2335
pub, err := sns.NewPublisher(sns.PublisherConfig{
24-
AWSConfig: cfg,
36+
AWSConfig: cfg,
37+
CreateTopicfNotExists: true,
2538
}, logger)
2639
if err != nil {
2740
panic(err)
2841
}
2942

30-
sub, err := sqs.NewSubsciber(sqs.SubscriberConfig{
43+
sub, err := sqs.NewSubscriber(sqs.SubscriberConfig{
3144
AWSConfig: cfg,
3245
}, logger)
3346
if err != nil {
@@ -36,21 +49,43 @@ func main() {
3649

3750
ctx := context.Background()
3851

39-
messages, err := sub.Subscribe(ctx, "local-queue4")
52+
messages, err := sub.Subscribe(ctx, SQS_QUEUE)
53+
if err != nil {
54+
panic(err)
55+
}
56+
57+
pubArn, err := pub.GetArnTopic(ctx, SNS_TOPIC)
58+
if err != nil {
59+
panic(err)
60+
}
61+
sqsUrl, err := sub.GetQueueUrl(ctx, SQS_QUEUE)
62+
if err != nil {
63+
panic(err)
64+
}
65+
66+
err = pub.AddSubscription(ctx, &awssns.SubscribeInput{
67+
Protocol: aws.String("sqs"),
68+
TopicArn: pubArn,
69+
Endpoint: sqsUrl,
70+
Attributes: map[string]string{
71+
"RawMessageDelivery": "true",
72+
},
73+
})
4074
if err != nil {
4175
panic(err)
4276
}
4377

78+
// Start consuming messages from SQS
4479
go func() {
4580
for m := range messages {
46-
logger.With(watermill.LogFields{"message": m}).Info("Received message", nil)
81+
logger.With(watermill.LogFields{"message": string(m.Payload)}).Info("Received message", nil)
4782
m.Ack()
4883
}
4984
}()
50-
85+
// Start sending messages to SNS
5186
for {
5287
msg := message.NewMessage(watermill.NewULID(), []byte(`{"some_json": "body"}`))
53-
err := pub.Publish("local-topic1", msg)
88+
err := pub.Publish(SNS_TOPIC, msg)
5489
if err != nil {
5590
panic(err)
5691
}

cmd/sqs-sqs/main.go

Lines changed: 23 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -2,39 +2,51 @@ package main
22

33
import (
44
"context"
5+
"os"
56
"time"
67

78
"github.com/ThreeDotsLabs/watermill"
89
"github.com/ThreeDotsLabs/watermill/message"
9-
"github.com/aws/aws-sdk-go/aws"
10+
awsconfig "github.com/aws/aws-sdk-go-v2/config"
1011

12+
"github.com/ThreeDotsLabs/watermill-amazonsqs/connection"
1113
"github.com/ThreeDotsLabs/watermill-amazonsqs/sqs"
1214
)
1315

1416
func main() {
17+
ctx := context.Background()
1518
logger := watermill.NewStdLogger(true, true)
16-
17-
cfg := aws.Config{
18-
Region: aws.String("eu-north-1"),
19+
cfg, err := awsconfig.LoadDefaultConfig(
20+
context.Background(),
21+
awsconfig.WithRegion("eu-north-1"),
22+
connection.SetEndPoint(os.Getenv("AWS_SNS_ENDPOINT")),
23+
)
24+
if err != nil {
25+
panic(err)
1926
}
20-
2127
pub, err := sqs.NewPublisher(sqs.PublisherConfig{
22-
AWSConfig: cfg,
23-
Marshaler: sqs.DefaultMarshalerUnmarshaler{},
28+
AWSConfig: cfg,
29+
CreateQueueIfNotExists: true,
30+
Marshaler: sqs.DefaultMarshalerUnmarshaler{},
2431
}, logger)
2532
if err != nil {
2633
panic(err)
2734
}
35+
_ = pub
2836

29-
sub, err := sqs.NewSubsciber(sqs.SubscriberConfig{
30-
AWSConfig: cfg,
31-
Unmarshaler: sqs.DefaultMarshalerUnmarshaler{},
37+
sub, err := sqs.NewSubscriber(sqs.SubscriberConfig{
38+
AWSConfig: cfg,
39+
CreateQueueInitializerConfig: sqs.QueueConfigAtrributes{},
40+
Unmarshaler: sqs.DefaultMarshalerUnmarshaler{},
3241
}, logger)
3342
if err != nil {
3443
panic(err)
3544
}
3645

37-
ctx := context.Background()
46+
err = sub.SubscribeInitialize("any-topic")
47+
if err != nil {
48+
panic(err)
49+
}
3850

3951
messages, err := sub.Subscribe(ctx, "any-topic")
4052
if err != nil {

connection/endpoint.go

Lines changed: 14 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,20 @@
11
package connection
22

33
import (
4-
"os"
5-
6-
"github.com/aws/aws-sdk-go/aws"
4+
"github.com/aws/aws-sdk-go-v2/aws"
5+
"github.com/aws/aws-sdk-go-v2/config"
76
)
87

9-
const AWS_ENDPOINT = "AWS_ENDPOINT"
10-
11-
func SetEndPoint(config aws.Config) aws.Config {
12-
newConfig := config
13-
awsEndpoint := os.Getenv(AWS_ENDPOINT)
14-
if awsEndpoint != "" {
15-
newConfig.Endpoint = aws.String(awsEndpoint)
16-
}
17-
return newConfig
8+
func SetEndPoint(endpoint string) config.LoadOptionsFunc {
9+
return config.WithEndpointResolverWithOptions(aws.EndpointResolverWithOptionsFunc(func(service, region string, options ...interface{}) (aws.Endpoint, error) {
10+
if endpoint != "" {
11+
return aws.Endpoint{
12+
PartitionID: "aws",
13+
URL: endpoint,
14+
SigningRegion: region,
15+
}, nil
16+
}
17+
// returning EndpointNotFoundError will allow the service to fallback to it's default resolution
18+
return aws.Endpoint{}, &aws.EndpointNotFoundError{}
19+
}))
1820
}

go.mod

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,12 @@ go 1.12
44

55
require (
66
github.com/ThreeDotsLabs/watermill v1.2.0
7-
github.com/aws/aws-sdk-go v1.25.11
7+
github.com/aws/aws-sdk-go-v2 v1.18.0
8+
github.com/aws/aws-sdk-go-v2/config v1.18.25
9+
github.com/aws/aws-sdk-go-v2/credentials v1.13.24
10+
github.com/aws/aws-sdk-go-v2/service/sns v1.20.11
11+
github.com/aws/aws-sdk-go-v2/service/sqs v1.22.0
12+
github.com/aws/smithy-go v1.13.5
13+
github.com/hashicorp/go-multierror v1.1.1
814
github.com/stretchr/testify v1.8.1
915
)

go.sum

Lines changed: 32 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -41,8 +41,34 @@ github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuy
4141
github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
4242
github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
4343
github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho=
44-
github.com/aws/aws-sdk-go v1.25.11 h1:wUivbsVOH3LpHdC3Rl5i+FLHfg4sOmYgv4bvHe7+/Pg=
45-
github.com/aws/aws-sdk-go v1.25.11/go.mod h1:KmX6BPdI08NWTb3/sm4ZGu5ShLoqVDhKgpiN924inxo=
44+
github.com/aws/aws-sdk-go-v2 v1.18.0 h1:882kkTpSFhdgYRKVZ/VCgf7sd0ru57p2JCxz4/oN5RY=
45+
github.com/aws/aws-sdk-go-v2 v1.18.0/go.mod h1:uzbQtefpm44goOPmdKyAlXSNcwlRgF3ePWVW6EtJvvw=
46+
github.com/aws/aws-sdk-go-v2/config v1.18.25 h1:JuYyZcnMPBiFqn87L2cRppo+rNwgah6YwD3VuyvaW6Q=
47+
github.com/aws/aws-sdk-go-v2/config v1.18.25/go.mod h1:dZnYpD5wTW/dQF0rRNLVypB396zWCcPiBIvdvSWHEg4=
48+
github.com/aws/aws-sdk-go-v2/credentials v1.13.24 h1:PjiYyls3QdCrzqUN35jMWtUK1vqVZ+zLfdOa/UPFDp0=
49+
github.com/aws/aws-sdk-go-v2/credentials v1.13.24/go.mod h1:jYPYi99wUOPIFi0rhiOvXeSEReVOzBqFNOX5bXYoG2o=
50+
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.13.3 h1:jJPgroehGvjrde3XufFIJUZVK5A2L9a3KwSFgKy9n8w=
51+
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.13.3/go.mod h1:4Q0UFP0YJf0NrsEuEYHpM9fTSEVnD16Z3uyEF7J9JGM=
52+
github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.33 h1:kG5eQilShqmJbv11XL1VpyDbaEJzWxd4zRiCG30GSn4=
53+
github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.33/go.mod h1:7i0PF1ME/2eUPFcjkVIwq+DOygHEoK92t5cDqNgYbIw=
54+
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.27 h1:vFQlirhuM8lLlpI7imKOMsjdQLuN9CPi+k44F/OFVsk=
55+
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.27/go.mod h1:UrHnn3QV/d0pBZ6QBAEQcqFLf8FAzLmoUfPVIueOvoM=
56+
github.com/aws/aws-sdk-go-v2/internal/ini v1.3.34 h1:gGLG7yKaXG02/jBlg210R7VgQIotiQntNhsCFejawx8=
57+
github.com/aws/aws-sdk-go-v2/internal/ini v1.3.34/go.mod h1:Etz2dj6UHYuw+Xw830KfzCfWGMzqvUTCjUj5b76GVDc=
58+
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.9.27 h1:0iKliEXAcCa2qVtRs7Ot5hItA2MsufrphbRFlz1Owxo=
59+
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.9.27/go.mod h1:EOwBD4J4S5qYszS5/3DpkejfuK+Z5/1uzICfPaZLtqw=
60+
github.com/aws/aws-sdk-go-v2/service/sns v1.20.11 h1:kUKAkuOhCCq/Av372Dtzg0oaAD5VEUYdDtU4lGIYKkw=
61+
github.com/aws/aws-sdk-go-v2/service/sns v1.20.11/go.mod h1:WjBcrd28zNbbuAcIRO/n89sSeOxTuOZPiuxNXU/2WrI=
62+
github.com/aws/aws-sdk-go-v2/service/sqs v1.22.0 h1:ikSvot5NdywduxtkOwOa2GJFzFuJq1ZjXsGjoIA82Ao=
63+
github.com/aws/aws-sdk-go-v2/service/sqs v1.22.0/go.mod h1:ujUjm+PrcKUeIiKu2PT7MWjcyY0D6YZRZF3fSswiO+0=
64+
github.com/aws/aws-sdk-go-v2/service/sso v1.12.10 h1:UBQjaMTCKwyUYwiVnUt6toEJwGXsLBI6al083tpjJzY=
65+
github.com/aws/aws-sdk-go-v2/service/sso v1.12.10/go.mod h1:ouy2P4z6sJN70fR3ka3wD3Ro3KezSxU6eKGQI2+2fjI=
66+
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.14.10 h1:PkHIIJs8qvq0e5QybnZoG1K/9QTrLr9OsqCIo59jOBA=
67+
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.14.10/go.mod h1:AFvkxc8xfBe8XA+5St5XIHHrQQtkxqrRincx4hmMHOk=
68+
github.com/aws/aws-sdk-go-v2/service/sts v1.19.0 h1:2DQLAKDteoEDI8zpCzqBMaZlJuoE9iTYD0gFmXVax9E=
69+
github.com/aws/aws-sdk-go-v2/service/sts v1.19.0/go.mod h1:BgQOMsg8av8jset59jelyPW7NoZcZXLVpDsXunGDrk8=
70+
github.com/aws/smithy-go v1.13.5 h1:hgz0X/DX0dGqTYpGALqXJoRKRj5oQ7150i5FdTePzO8=
71+
github.com/aws/smithy-go v1.13.5/go.mod h1:Tg+OJXh4MB2R/uN61Ko2f6hTZwB/ZYGOtib8J3gBHzA=
4672
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
4773
github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8=
4874
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
@@ -119,6 +145,7 @@ github.com/google/go-cmp v0.5.1/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/
119145
github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
120146
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
121147
github.com/google/go-cmp v0.5.8/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
148+
github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38=
122149
github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
123150
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
124151
github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXiIaQGbYVAs8BPL6v8lEs=
@@ -144,8 +171,8 @@ github.com/hashicorp/go-multierror v1.1.1/go.mod h1:iw975J/qwKPdAO1clOe2L8331t/9
144171
github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
145172
github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
146173
github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc=
147-
github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af h1:pmfjZENx5imkbgOkpRUYLnmbU7UEFbjtDA2hxJ1ichM=
148-
github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af/go.mod h1:Nht3zPeWKUH0NzdCt2Blrr5ys8VGpn0CEB0cQHVjt7k=
174+
github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo=
175+
github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfCI6z80xFu9LTZmf1ZRjMHUOPmWr69U=
149176
github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX5e0EB2j4=
150177
github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU=
151178
github.com/json-iterator/go v1.1.10/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4=
@@ -312,7 +339,6 @@ golang.org/x/net v0.0.0-20220127200216-cd36cc0744dd/go.mod h1:CfG3xpIq0wQ8r1q4Su
312339
golang.org/x/net v0.0.0-20220225172249-27dd8689420f/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk=
313340
golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c=
314341
golang.org/x/net v0.3.0/go.mod h1:MBQ8lrhLObU/6UmLb4fmbmk5OcyYmqtbGd/9yIeKjEE=
315-
golang.org/x/net v0.4.0 h1:Q5QPcMlvfxFTAPV0+07Xz/MpK9NTXu2VDUuy0FeMfaU=
316342
golang.org/x/net v0.4.0/go.mod h1:MBQ8lrhLObU/6UmLb4fmbmk5OcyYmqtbGd/9yIeKjEE=
317343
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
318344
golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
@@ -388,7 +414,6 @@ golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
388414
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
389415
golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
390416
golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
391-
golang.org/x/text v0.5.0 h1:OLmvp0KP+FVG99Ct/qFiL/Fhk4zp4QQnZ7b2U+5piUM=
392417
golang.org/x/text v0.5.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8=
393418
golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
394419
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
@@ -529,6 +554,7 @@ gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
529554
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
530555
gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
531556
gopkg.in/yaml.v2 v2.2.5/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
557+
gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
532558
gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
533559
gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ=
534560
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

sns/marshaler.go

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
package sns
2+
3+
import (
4+
"github.com/aws/aws-sdk-go-v2/aws"
5+
"github.com/aws/aws-sdk-go-v2/service/sns"
6+
"github.com/aws/aws-sdk-go-v2/service/sns/types"
7+
8+
"github.com/ThreeDotsLabs/watermill/message"
9+
)
10+
11+
const UUIDAttribute = "UUID"
12+
13+
type Marshaler interface {
14+
Marshal(msg *message.Message) *sns.PublishInput
15+
}
16+
17+
type DefaultMarshalerUnmarshaler struct{}
18+
19+
func (d DefaultMarshalerUnmarshaler) Marshal(msg *message.Message) *sns.PublishInput {
20+
// client side uuid
21+
// there is a deduplication id that can be use for
22+
// fifo queues
23+
attributes, deduplicationId, groupId := metadataToAttributes(msg.Metadata)
24+
attributes[UUIDAttribute] = types.MessageAttributeValue{
25+
StringValue: aws.String(msg.UUID),
26+
DataType: aws.String("String"),
27+
}
28+
29+
return &sns.PublishInput{
30+
Message: aws.String(string(msg.Payload)),
31+
MessageAttributes: attributes,
32+
MessageDeduplicationId: deduplicationId,
33+
MessageGroupId: groupId,
34+
}
35+
}
36+
37+
func metadataToAttributes(meta message.Metadata) (map[string]types.MessageAttributeValue, *string, *string) {
38+
attributes := make(map[string]types.MessageAttributeValue)
39+
var deduplicationId, groupId *string
40+
for k, v := range meta {
41+
// SNS has special attributes for deduplication and group id
42+
if k == "MessageDeduplicationId" {
43+
deduplicationId = aws.String(v)
44+
continue
45+
}
46+
if k == "MessageGroupId" {
47+
groupId = aws.String(v)
48+
continue
49+
}
50+
attributes[k] = types.MessageAttributeValue{
51+
StringValue: aws.String(v),
52+
DataType: aws.String("String"),
53+
}
54+
}
55+
56+
return attributes, deduplicationId, groupId
57+
}

0 commit comments

Comments
 (0)