Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1 +1,3 @@
.idea/
.vscode/
.DS_Store
53 changes: 44 additions & 9 deletions cmd/sns-sqs/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,32 +2,45 @@ package main

import (
"context"
"os"
"time"

"github.com/ThreeDotsLabs/watermill-amazonsqs/connection"
"github.com/ThreeDotsLabs/watermill-amazonsqs/sqs"
"github.com/aws/aws-sdk-go-v2/aws"
awsconfig "github.com/aws/aws-sdk-go-v2/config"
awssns "github.com/aws/aws-sdk-go-v2/service/sns"

"github.com/ThreeDotsLabs/watermill"
"github.com/ThreeDotsLabs/watermill/message"
"github.com/aws/aws-sdk-go/aws"

"github.com/ThreeDotsLabs/watermill-amazonsqs/sns"
)

const SNS_TOPIC = "local-topic1"
const SQS_QUEUE = "local-queue4"

func main() {
logger := watermill.NewStdLogger(true, true)

cfg := aws.Config{
Region: aws.String("eu-north-1"),
cfg, err := awsconfig.LoadDefaultConfig(
context.Background(),
awsconfig.WithRegion("eu-north-1"),
connection.SetEndPoint(os.Getenv("AWS_SNS_ENDPOINT")),
)
if err != nil {
panic(err)
}

pub, err := sns.NewPublisher(sns.PublisherConfig{
AWSConfig: cfg,
AWSConfig: cfg,
CreateTopicfNotExists: true,
}, logger)
if err != nil {
panic(err)
}

sub, err := sqs.NewSubsciber(sqs.SubscriberConfig{
sub, err := sqs.NewSubscriber(sqs.SubscriberConfig{
AWSConfig: cfg,
}, logger)
if err != nil {
Expand All @@ -36,21 +49,43 @@ func main() {

ctx := context.Background()

messages, err := sub.Subscribe(ctx, "local-queue4")
messages, err := sub.Subscribe(ctx, SQS_QUEUE)
if err != nil {
panic(err)
}

pubArn, err := pub.GetArnTopic(ctx, SNS_TOPIC)
if err != nil {
panic(err)
}
sqsUrl, err := sub.GetQueueUrl(ctx, SQS_QUEUE)
if err != nil {
panic(err)
}

err = pub.AddSubscription(ctx, &awssns.SubscribeInput{
Protocol: aws.String("sqs"),
TopicArn: pubArn,
Endpoint: sqsUrl,
Attributes: map[string]string{
"RawMessageDelivery": "true",
},
})
if err != nil {
panic(err)
}

// Start consuming messages from SQS
go func() {
for m := range messages {
logger.With(watermill.LogFields{"message": m}).Info("Received message", nil)
logger.With(watermill.LogFields{"message": string(m.Payload)}).Info("Received message", nil)
m.Ack()
}
}()

// Start sending messages to SNS
for {
msg := message.NewMessage(watermill.NewULID(), []byte(`{"some_json": "body"}`))
err := pub.Publish("local-topic1", msg)
err := pub.Publish(SNS_TOPIC, msg)
if err != nil {
panic(err)
}
Expand Down
34 changes: 23 additions & 11 deletions cmd/sqs-sqs/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,39 +2,51 @@ package main

import (
"context"
"os"
"time"

"github.com/ThreeDotsLabs/watermill"
"github.com/ThreeDotsLabs/watermill/message"
"github.com/aws/aws-sdk-go/aws"
awsconfig "github.com/aws/aws-sdk-go-v2/config"

"github.com/ThreeDotsLabs/watermill-amazonsqs/connection"
"github.com/ThreeDotsLabs/watermill-amazonsqs/sqs"
)

func main() {
ctx := context.Background()
logger := watermill.NewStdLogger(true, true)

cfg := aws.Config{
Region: aws.String("eu-north-1"),
cfg, err := awsconfig.LoadDefaultConfig(
context.Background(),
awsconfig.WithRegion("eu-north-1"),
connection.SetEndPoint(os.Getenv("AWS_SNS_ENDPOINT")),
)
if err != nil {
panic(err)
}

pub, err := sqs.NewPublisher(sqs.PublisherConfig{
AWSConfig: cfg,
Marshaler: sqs.DefaultMarshalerUnmarshaler{},
AWSConfig: cfg,
CreateQueueIfNotExists: true,
Marshaler: sqs.DefaultMarshalerUnmarshaler{},
}, logger)
if err != nil {
panic(err)
}
_ = pub

sub, err := sqs.NewSubsciber(sqs.SubscriberConfig{
AWSConfig: cfg,
Unmarshaler: sqs.DefaultMarshalerUnmarshaler{},
sub, err := sqs.NewSubscriber(sqs.SubscriberConfig{
AWSConfig: cfg,
CreateQueueInitializerConfig: sqs.QueueConfigAtrributes{},
Unmarshaler: sqs.DefaultMarshalerUnmarshaler{},
}, logger)
if err != nil {
panic(err)
}

ctx := context.Background()
err = sub.SubscribeInitialize("any-topic")
if err != nil {
panic(err)
}

messages, err := sub.Subscribe(ctx, "any-topic")
if err != nil {
Expand Down
26 changes: 14 additions & 12 deletions connection/endpoint.go
Original file line number Diff line number Diff line change
@@ -1,18 +1,20 @@
package connection

import (
"os"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/config"
)

const AWS_ENDPOINT = "AWS_ENDPOINT"

func SetEndPoint(config aws.Config) aws.Config {
newConfig := config
awsEndpoint := os.Getenv(AWS_ENDPOINT)
if awsEndpoint != "" {
newConfig.Endpoint = aws.String(awsEndpoint)
}
return newConfig
func SetEndPoint(endpoint string) config.LoadOptionsFunc {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
func SetEndPoint(endpoint string) config.LoadOptionsFunc {
func EndpointOption(endpoint string) config.LoadOptionsFunc {

Since its no longer setting anything

Also the package connection is a bit iffy on this one, maybe config, where other config related functionality could reside in the future? Nitpick

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for the review Nikola!

return config.WithEndpointResolverWithOptions(aws.EndpointResolverWithOptionsFunc(func(service, region string, options ...interface{}) (aws.Endpoint, error) {
if endpoint != "" {
return aws.Endpoint{
PartitionID: "aws",
URL: endpoint,
SigningRegion: region,
}, nil
}
// returning EndpointNotFoundError will allow the service to fallback to it's default resolution
return aws.Endpoint{}, &aws.EndpointNotFoundError{}
}))
}
8 changes: 7 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,12 @@ go 1.12

require (
github.com/ThreeDotsLabs/watermill v1.2.0
github.com/aws/aws-sdk-go v1.25.11
github.com/aws/aws-sdk-go-v2 v1.18.0
github.com/aws/aws-sdk-go-v2/config v1.18.25
github.com/aws/aws-sdk-go-v2/credentials v1.13.24
github.com/aws/aws-sdk-go-v2/service/sns v1.20.11
github.com/aws/aws-sdk-go-v2/service/sqs v1.22.0
github.com/aws/smithy-go v1.13.5
github.com/hashicorp/go-multierror v1.1.1
github.com/stretchr/testify v1.8.1
)
38 changes: 32 additions & 6 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,34 @@ github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuy
github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho=
github.com/aws/aws-sdk-go v1.25.11 h1:wUivbsVOH3LpHdC3Rl5i+FLHfg4sOmYgv4bvHe7+/Pg=
github.com/aws/aws-sdk-go v1.25.11/go.mod h1:KmX6BPdI08NWTb3/sm4ZGu5ShLoqVDhKgpiN924inxo=
github.com/aws/aws-sdk-go-v2 v1.18.0 h1:882kkTpSFhdgYRKVZ/VCgf7sd0ru57p2JCxz4/oN5RY=
github.com/aws/aws-sdk-go-v2 v1.18.0/go.mod h1:uzbQtefpm44goOPmdKyAlXSNcwlRgF3ePWVW6EtJvvw=
github.com/aws/aws-sdk-go-v2/config v1.18.25 h1:JuYyZcnMPBiFqn87L2cRppo+rNwgah6YwD3VuyvaW6Q=
github.com/aws/aws-sdk-go-v2/config v1.18.25/go.mod h1:dZnYpD5wTW/dQF0rRNLVypB396zWCcPiBIvdvSWHEg4=
github.com/aws/aws-sdk-go-v2/credentials v1.13.24 h1:PjiYyls3QdCrzqUN35jMWtUK1vqVZ+zLfdOa/UPFDp0=
github.com/aws/aws-sdk-go-v2/credentials v1.13.24/go.mod h1:jYPYi99wUOPIFi0rhiOvXeSEReVOzBqFNOX5bXYoG2o=
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.13.3 h1:jJPgroehGvjrde3XufFIJUZVK5A2L9a3KwSFgKy9n8w=
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.13.3/go.mod h1:4Q0UFP0YJf0NrsEuEYHpM9fTSEVnD16Z3uyEF7J9JGM=
github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.33 h1:kG5eQilShqmJbv11XL1VpyDbaEJzWxd4zRiCG30GSn4=
github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.33/go.mod h1:7i0PF1ME/2eUPFcjkVIwq+DOygHEoK92t5cDqNgYbIw=
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.27 h1:vFQlirhuM8lLlpI7imKOMsjdQLuN9CPi+k44F/OFVsk=
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.27/go.mod h1:UrHnn3QV/d0pBZ6QBAEQcqFLf8FAzLmoUfPVIueOvoM=
github.com/aws/aws-sdk-go-v2/internal/ini v1.3.34 h1:gGLG7yKaXG02/jBlg210R7VgQIotiQntNhsCFejawx8=
github.com/aws/aws-sdk-go-v2/internal/ini v1.3.34/go.mod h1:Etz2dj6UHYuw+Xw830KfzCfWGMzqvUTCjUj5b76GVDc=
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.9.27 h1:0iKliEXAcCa2qVtRs7Ot5hItA2MsufrphbRFlz1Owxo=
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.9.27/go.mod h1:EOwBD4J4S5qYszS5/3DpkejfuK+Z5/1uzICfPaZLtqw=
github.com/aws/aws-sdk-go-v2/service/sns v1.20.11 h1:kUKAkuOhCCq/Av372Dtzg0oaAD5VEUYdDtU4lGIYKkw=
github.com/aws/aws-sdk-go-v2/service/sns v1.20.11/go.mod h1:WjBcrd28zNbbuAcIRO/n89sSeOxTuOZPiuxNXU/2WrI=
github.com/aws/aws-sdk-go-v2/service/sqs v1.22.0 h1:ikSvot5NdywduxtkOwOa2GJFzFuJq1ZjXsGjoIA82Ao=
github.com/aws/aws-sdk-go-v2/service/sqs v1.22.0/go.mod h1:ujUjm+PrcKUeIiKu2PT7MWjcyY0D6YZRZF3fSswiO+0=
github.com/aws/aws-sdk-go-v2/service/sso v1.12.10 h1:UBQjaMTCKwyUYwiVnUt6toEJwGXsLBI6al083tpjJzY=
github.com/aws/aws-sdk-go-v2/service/sso v1.12.10/go.mod h1:ouy2P4z6sJN70fR3ka3wD3Ro3KezSxU6eKGQI2+2fjI=
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.14.10 h1:PkHIIJs8qvq0e5QybnZoG1K/9QTrLr9OsqCIo59jOBA=
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.14.10/go.mod h1:AFvkxc8xfBe8XA+5St5XIHHrQQtkxqrRincx4hmMHOk=
github.com/aws/aws-sdk-go-v2/service/sts v1.19.0 h1:2DQLAKDteoEDI8zpCzqBMaZlJuoE9iTYD0gFmXVax9E=
github.com/aws/aws-sdk-go-v2/service/sts v1.19.0/go.mod h1:BgQOMsg8av8jset59jelyPW7NoZcZXLVpDsXunGDrk8=
github.com/aws/smithy-go v1.13.5 h1:hgz0X/DX0dGqTYpGALqXJoRKRj5oQ7150i5FdTePzO8=
github.com/aws/smithy-go v1.13.5/go.mod h1:Tg+OJXh4MB2R/uN61Ko2f6hTZwB/ZYGOtib8J3gBHzA=
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8=
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
Expand Down Expand Up @@ -119,6 +145,7 @@ github.com/google/go-cmp v0.5.1/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/
github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.8/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38=
github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXiIaQGbYVAs8BPL6v8lEs=
Expand All @@ -144,8 +171,8 @@ github.com/hashicorp/go-multierror v1.1.1/go.mod h1:iw975J/qwKPdAO1clOe2L8331t/9
github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc=
github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af h1:pmfjZENx5imkbgOkpRUYLnmbU7UEFbjtDA2hxJ1ichM=
github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af/go.mod h1:Nht3zPeWKUH0NzdCt2Blrr5ys8VGpn0CEB0cQHVjt7k=
github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo=
github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfCI6z80xFu9LTZmf1ZRjMHUOPmWr69U=
github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX5e0EB2j4=
github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU=
github.com/json-iterator/go v1.1.10/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4=
Expand Down Expand Up @@ -312,7 +339,6 @@ golang.org/x/net v0.0.0-20220127200216-cd36cc0744dd/go.mod h1:CfG3xpIq0wQ8r1q4Su
golang.org/x/net v0.0.0-20220225172249-27dd8689420f/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk=
golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c=
golang.org/x/net v0.3.0/go.mod h1:MBQ8lrhLObU/6UmLb4fmbmk5OcyYmqtbGd/9yIeKjEE=
golang.org/x/net v0.4.0 h1:Q5QPcMlvfxFTAPV0+07Xz/MpK9NTXu2VDUuy0FeMfaU=
golang.org/x/net v0.4.0/go.mod h1:MBQ8lrhLObU/6UmLb4fmbmk5OcyYmqtbGd/9yIeKjEE=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
Expand Down Expand Up @@ -388,7 +414,6 @@ golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
golang.org/x/text v0.5.0 h1:OLmvp0KP+FVG99Ct/qFiL/Fhk4zp4QQnZ7b2U+5piUM=
golang.org/x/text v0.5.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8=
golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
Expand Down Expand Up @@ -529,6 +554,7 @@ gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.5/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
Expand Down
57 changes: 57 additions & 0 deletions sns/marshaler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package sns

import (
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/sns"
"github.com/aws/aws-sdk-go-v2/service/sns/types"

"github.com/ThreeDotsLabs/watermill/message"
)

const UUIDAttribute = "UUID"

type Marshaler interface {
Marshal(msg *message.Message) *sns.PublishInput
}

type DefaultMarshalerUnmarshaler struct{}

func (d DefaultMarshalerUnmarshaler) Marshal(msg *message.Message) *sns.PublishInput {
// client side uuid
// there is a deduplication id that can be use for
// fifo queues
attributes, deduplicationId, groupId := metadataToAttributes(msg.Metadata)
attributes[UUIDAttribute] = types.MessageAttributeValue{
StringValue: aws.String(msg.UUID),
DataType: aws.String("String"),
}

return &sns.PublishInput{
Message: aws.String(string(msg.Payload)),
MessageAttributes: attributes,
MessageDeduplicationId: deduplicationId,
MessageGroupId: groupId,
}
}

func metadataToAttributes(meta message.Metadata) (map[string]types.MessageAttributeValue, *string, *string) {
attributes := make(map[string]types.MessageAttributeValue)
var deduplicationId, groupId *string
for k, v := range meta {
// SNS has special attributes for deduplication and group id
if k == "MessageDeduplicationId" {
deduplicationId = aws.String(v)
continue
}
if k == "MessageGroupId" {
groupId = aws.String(v)
continue
}
attributes[k] = types.MessageAttributeValue{
StringValue: aws.String(v),
DataType: aws.String("String"),
}
}

return attributes, deduplicationId, groupId
}
Loading