Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1 +1,3 @@
.idea/
.vscode/
.DS_Store
53 changes: 44 additions & 9 deletions cmd/sns-sqs/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,32 +2,45 @@ package main

import (
"context"
"os"
"time"

"github.com/ThreeDotsLabs/watermill-amazonsqs/connection"
"github.com/ThreeDotsLabs/watermill-amazonsqs/sqs"
"github.com/aws/aws-sdk-go-v2/aws"
awsconfig "github.com/aws/aws-sdk-go-v2/config"
awssns "github.com/aws/aws-sdk-go-v2/service/sns"

"github.com/ThreeDotsLabs/watermill"
"github.com/ThreeDotsLabs/watermill/message"
"github.com/aws/aws-sdk-go/aws"

"github.com/ThreeDotsLabs/watermill-amazonsqs/sns"
)

const SNS_TOPIC = "local-topic1"
const SQS_QUEUE = "local-queue4"

func main() {
logger := watermill.NewStdLogger(true, true)

cfg := aws.Config{
Region: aws.String("eu-north-1"),
cfg, err := awsconfig.LoadDefaultConfig(
context.Background(),
awsconfig.WithRegion("eu-north-1"),
connection.SetEndPoint(os.Getenv("AWS_SNS_ENDPOINT")),
)
if err != nil {
panic(err)
}

pub, err := sns.NewPublisher(sns.PublisherConfig{
AWSConfig: cfg,
AWSConfig: cfg,
CreateTopicfNotExists: true,
}, logger)
if err != nil {
panic(err)
}

sub, err := sqs.NewSubsciber(sqs.SubscriberConfig{
sub, err := sqs.NewSubscriber(sqs.SubscriberConfig{
AWSConfig: cfg,
}, logger)
if err != nil {
Expand All @@ -36,21 +49,43 @@ func main() {

ctx := context.Background()

messages, err := sub.Subscribe(ctx, "local-queue4")
messages, err := sub.Subscribe(ctx, SQS_QUEUE)
if err != nil {
panic(err)
}

pubArn, err := pub.GetArnTopic(ctx, SNS_TOPIC)
if err != nil {
panic(err)
}
sqsUrl, err := sub.GetQueueUrl(ctx, SQS_QUEUE)
if err != nil {
panic(err)
}

err = pub.AddSubscription(ctx, &awssns.SubscribeInput{
Protocol: aws.String("sqs"),
TopicArn: pubArn,
Endpoint: sqsUrl,
Attributes: map[string]string{
"RawMessageDelivery": "true",
},
})
if err != nil {
panic(err)
}

// Start consuming messages from SQS
go func() {
for m := range messages {
logger.With(watermill.LogFields{"message": m}).Info("Received message", nil)
logger.With(watermill.LogFields{"message": string(m.Payload)}).Info("Received message", nil)
m.Ack()
}
}()

// Start sending messages to SNS
for {
msg := message.NewMessage(watermill.NewULID(), []byte(`{"some_json": "body"}`))
err := pub.Publish("local-topic1", msg)
err := pub.Publish(SNS_TOPIC, msg)
if err != nil {
panic(err)
}
Expand Down
34 changes: 23 additions & 11 deletions cmd/sqs-sqs/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,39 +2,51 @@ package main

import (
"context"
"os"
"time"

"github.com/ThreeDotsLabs/watermill"
"github.com/ThreeDotsLabs/watermill/message"
"github.com/aws/aws-sdk-go/aws"
awsconfig "github.com/aws/aws-sdk-go-v2/config"

"github.com/ThreeDotsLabs/watermill-amazonsqs/connection"
"github.com/ThreeDotsLabs/watermill-amazonsqs/sqs"
)

func main() {
ctx := context.Background()
logger := watermill.NewStdLogger(true, true)

cfg := aws.Config{
Region: aws.String("eu-north-1"),
cfg, err := awsconfig.LoadDefaultConfig(
context.Background(),
awsconfig.WithRegion("eu-north-1"),
connection.SetEndPoint(os.Getenv("AWS_SNS_ENDPOINT")),
)
if err != nil {
panic(err)
}

pub, err := sqs.NewPublisher(sqs.PublisherConfig{
AWSConfig: cfg,
Marshaler: sqs.DefaultMarshalerUnmarshaler{},
AWSConfig: cfg,
CreateQueueIfNotExists: true,
Marshaler: sqs.DefaultMarshalerUnmarshaler{},
}, logger)
if err != nil {
panic(err)
}
_ = pub

sub, err := sqs.NewSubsciber(sqs.SubscriberConfig{
AWSConfig: cfg,
Unmarshaler: sqs.DefaultMarshalerUnmarshaler{},
sub, err := sqs.NewSubscriber(sqs.SubscriberConfig{
AWSConfig: cfg,
CreateQueueInitializerConfig: sqs.QueueConfigAtrributes{},
Unmarshaler: sqs.DefaultMarshalerUnmarshaler{},
}, logger)
if err != nil {
panic(err)
}

ctx := context.Background()
err = sub.SubscribeInitialize("any-topic")
if err != nil {
panic(err)
}

messages, err := sub.Subscribe(ctx, "any-topic")
if err != nil {
Expand Down
26 changes: 14 additions & 12 deletions connection/endpoint.go
Original file line number Diff line number Diff line change
@@ -1,18 +1,20 @@
package connection

import (
"os"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/config"
)

const AWS_ENDPOINT = "AWS_ENDPOINT"

func SetEndPoint(config aws.Config) aws.Config {
newConfig := config
awsEndpoint := os.Getenv(AWS_ENDPOINT)
if awsEndpoint != "" {
newConfig.Endpoint = aws.String(awsEndpoint)
}
return newConfig
func SetEndPoint(endpoint string) config.LoadOptionsFunc {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
func SetEndPoint(endpoint string) config.LoadOptionsFunc {
func EndpointOption(endpoint string) config.LoadOptionsFunc {

Since its no longer setting anything

Also the package connection is a bit iffy on this one, maybe config, where other config related functionality could reside in the future? Nitpick

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for the review Nikola!

return config.WithEndpointResolverWithOptions(aws.EndpointResolverWithOptionsFunc(func(service, region string, options ...interface{}) (aws.Endpoint, error) {
if endpoint != "" {
return aws.Endpoint{
PartitionID: "aws",
URL: endpoint,
SigningRegion: region,
}, nil
}
// returning EndpointNotFoundError will allow the service to fallback to it's default resolution
return aws.Endpoint{}, &aws.EndpointNotFoundError{}
}))
}
8 changes: 7 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,12 @@ go 1.12

require (
github.com/ThreeDotsLabs/watermill v1.2.0
github.com/aws/aws-sdk-go v1.25.11
github.com/aws/aws-sdk-go-v2 v1.18.0
github.com/aws/aws-sdk-go-v2/config v1.18.25
github.com/aws/aws-sdk-go-v2/credentials v1.13.24
github.com/aws/aws-sdk-go-v2/service/sns v1.20.11
github.com/aws/aws-sdk-go-v2/service/sqs v1.22.0
github.com/aws/smithy-go v1.13.5
github.com/hashicorp/go-multierror v1.1.1
github.com/stretchr/testify v1.8.1
)
38 changes: 32 additions & 6 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,34 @@ github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuy
github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho=
github.com/aws/aws-sdk-go v1.25.11 h1:wUivbsVOH3LpHdC3Rl5i+FLHfg4sOmYgv4bvHe7+/Pg=
github.com/aws/aws-sdk-go v1.25.11/go.mod h1:KmX6BPdI08NWTb3/sm4ZGu5ShLoqVDhKgpiN924inxo=
github.com/aws/aws-sdk-go-v2 v1.18.0 h1:882kkTpSFhdgYRKVZ/VCgf7sd0ru57p2JCxz4/oN5RY=
github.com/aws/aws-sdk-go-v2 v1.18.0/go.mod h1:uzbQtefpm44goOPmdKyAlXSNcwlRgF3ePWVW6EtJvvw=
github.com/aws/aws-sdk-go-v2/config v1.18.25 h1:JuYyZcnMPBiFqn87L2cRppo+rNwgah6YwD3VuyvaW6Q=
github.com/aws/aws-sdk-go-v2/config v1.18.25/go.mod h1:dZnYpD5wTW/dQF0rRNLVypB396zWCcPiBIvdvSWHEg4=
github.com/aws/aws-sdk-go-v2/credentials v1.13.24 h1:PjiYyls3QdCrzqUN35jMWtUK1vqVZ+zLfdOa/UPFDp0=
github.com/aws/aws-sdk-go-v2/credentials v1.13.24/go.mod h1:jYPYi99wUOPIFi0rhiOvXeSEReVOzBqFNOX5bXYoG2o=
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.13.3 h1:jJPgroehGvjrde3XufFIJUZVK5A2L9a3KwSFgKy9n8w=
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.13.3/go.mod h1:4Q0UFP0YJf0NrsEuEYHpM9fTSEVnD16Z3uyEF7J9JGM=
github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.33 h1:kG5eQilShqmJbv11XL1VpyDbaEJzWxd4zRiCG30GSn4=
github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.33/go.mod h1:7i0PF1ME/2eUPFcjkVIwq+DOygHEoK92t5cDqNgYbIw=
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.27 h1:vFQlirhuM8lLlpI7imKOMsjdQLuN9CPi+k44F/OFVsk=
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.27/go.mod h1:UrHnn3QV/d0pBZ6QBAEQcqFLf8FAzLmoUfPVIueOvoM=
github.com/aws/aws-sdk-go-v2/internal/ini v1.3.34 h1:gGLG7yKaXG02/jBlg210R7VgQIotiQntNhsCFejawx8=
github.com/aws/aws-sdk-go-v2/internal/ini v1.3.34/go.mod h1:Etz2dj6UHYuw+Xw830KfzCfWGMzqvUTCjUj5b76GVDc=
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.9.27 h1:0iKliEXAcCa2qVtRs7Ot5hItA2MsufrphbRFlz1Owxo=
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.9.27/go.mod h1:EOwBD4J4S5qYszS5/3DpkejfuK+Z5/1uzICfPaZLtqw=
github.com/aws/aws-sdk-go-v2/service/sns v1.20.11 h1:kUKAkuOhCCq/Av372Dtzg0oaAD5VEUYdDtU4lGIYKkw=
github.com/aws/aws-sdk-go-v2/service/sns v1.20.11/go.mod h1:WjBcrd28zNbbuAcIRO/n89sSeOxTuOZPiuxNXU/2WrI=
github.com/aws/aws-sdk-go-v2/service/sqs v1.22.0 h1:ikSvot5NdywduxtkOwOa2GJFzFuJq1ZjXsGjoIA82Ao=
github.com/aws/aws-sdk-go-v2/service/sqs v1.22.0/go.mod h1:ujUjm+PrcKUeIiKu2PT7MWjcyY0D6YZRZF3fSswiO+0=
github.com/aws/aws-sdk-go-v2/service/sso v1.12.10 h1:UBQjaMTCKwyUYwiVnUt6toEJwGXsLBI6al083tpjJzY=
github.com/aws/aws-sdk-go-v2/service/sso v1.12.10/go.mod h1:ouy2P4z6sJN70fR3ka3wD3Ro3KezSxU6eKGQI2+2fjI=
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.14.10 h1:PkHIIJs8qvq0e5QybnZoG1K/9QTrLr9OsqCIo59jOBA=
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.14.10/go.mod h1:AFvkxc8xfBe8XA+5St5XIHHrQQtkxqrRincx4hmMHOk=
github.com/aws/aws-sdk-go-v2/service/sts v1.19.0 h1:2DQLAKDteoEDI8zpCzqBMaZlJuoE9iTYD0gFmXVax9E=
github.com/aws/aws-sdk-go-v2/service/sts v1.19.0/go.mod h1:BgQOMsg8av8jset59jelyPW7NoZcZXLVpDsXunGDrk8=
github.com/aws/smithy-go v1.13.5 h1:hgz0X/DX0dGqTYpGALqXJoRKRj5oQ7150i5FdTePzO8=
github.com/aws/smithy-go v1.13.5/go.mod h1:Tg+OJXh4MB2R/uN61Ko2f6hTZwB/ZYGOtib8J3gBHzA=
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8=
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
Expand Down Expand Up @@ -119,6 +145,7 @@ github.com/google/go-cmp v0.5.1/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/
github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.8/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38=
github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXiIaQGbYVAs8BPL6v8lEs=
Expand All @@ -144,8 +171,8 @@ github.com/hashicorp/go-multierror v1.1.1/go.mod h1:iw975J/qwKPdAO1clOe2L8331t/9
github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc=
github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af h1:pmfjZENx5imkbgOkpRUYLnmbU7UEFbjtDA2hxJ1ichM=
github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af/go.mod h1:Nht3zPeWKUH0NzdCt2Blrr5ys8VGpn0CEB0cQHVjt7k=
github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo=
github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfCI6z80xFu9LTZmf1ZRjMHUOPmWr69U=
github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX5e0EB2j4=
github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU=
github.com/json-iterator/go v1.1.10/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4=
Expand Down Expand Up @@ -312,7 +339,6 @@ golang.org/x/net v0.0.0-20220127200216-cd36cc0744dd/go.mod h1:CfG3xpIq0wQ8r1q4Su
golang.org/x/net v0.0.0-20220225172249-27dd8689420f/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk=
golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c=
golang.org/x/net v0.3.0/go.mod h1:MBQ8lrhLObU/6UmLb4fmbmk5OcyYmqtbGd/9yIeKjEE=
golang.org/x/net v0.4.0 h1:Q5QPcMlvfxFTAPV0+07Xz/MpK9NTXu2VDUuy0FeMfaU=
golang.org/x/net v0.4.0/go.mod h1:MBQ8lrhLObU/6UmLb4fmbmk5OcyYmqtbGd/9yIeKjEE=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
Expand Down Expand Up @@ -388,7 +414,6 @@ golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
golang.org/x/text v0.5.0 h1:OLmvp0KP+FVG99Ct/qFiL/Fhk4zp4QQnZ7b2U+5piUM=
golang.org/x/text v0.5.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8=
golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
Expand Down Expand Up @@ -529,6 +554,7 @@ gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.5/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
Expand Down
57 changes: 57 additions & 0 deletions sns/marshaler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package sns

import (
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/sns"
"github.com/aws/aws-sdk-go-v2/service/sns/types"

"github.com/ThreeDotsLabs/watermill/message"
)

const UUIDAttribute = "UUID"

type Marshaler interface {
Marshal(msg *message.Message) *sns.PublishInput
}

type DefaultMarshalerUnmarshaler struct{}

func (d DefaultMarshalerUnmarshaler) Marshal(msg *message.Message) *sns.PublishInput {
// client side uuid
// there is a deduplication id that can be use for
// fifo queues
attributes, deduplicationId, groupId := metadataToAttributes(msg.Metadata)
attributes[UUIDAttribute] = types.MessageAttributeValue{
StringValue: aws.String(msg.UUID),
DataType: aws.String("String"),
}

return &sns.PublishInput{
Message: aws.String(string(msg.Payload)),
MessageAttributes: attributes,
MessageDeduplicationId: deduplicationId,
MessageGroupId: groupId,
}
}

func metadataToAttributes(meta message.Metadata) (map[string]types.MessageAttributeValue, *string, *string) {
attributes := make(map[string]types.MessageAttributeValue)
var deduplicationId, groupId *string
for k, v := range meta {
// SNS has special attributes for deduplication and group id
if k == "MessageDeduplicationId" {
deduplicationId = aws.String(v)
continue
}
if k == "MessageGroupId" {
groupId = aws.String(v)
continue
}
attributes[k] = types.MessageAttributeValue{
StringValue: aws.String(v),
DataType: aws.String("String"),
}
}

return attributes, deduplicationId, groupId
}
Loading