Skip to content

Commit 2be0820

Browse files
packrat386mlarraz
authored andcommitted
Parse message bodies with complex types (#4)
We assumed initially that every field in an SNS message would be a string. However some of them can be complex objects. We can avoid having to figure that out by just parsing them all to json.RawMessage and then parsing the only two fields we actually need to strings.
1 parent 1a1fb53 commit 2be0820

File tree

3 files changed

+51
-6
lines changed

3 files changed

+51
-6
lines changed

main.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ func init() {
2424
Poll SQS queues specified in a config and enqueue Sidekiq jobs with the queue items.
2525
It gracefully stops when sent SIGTERM.`
2626

27-
app.Version = "1.3"
27+
app.Version = "1.4"
2828

2929
app.Flags = []cli.Flag{
3030
cli.StringFlag{

queue.go

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -91,26 +91,39 @@ func (q *queue) deleteMessage(msg Message, ctx log.FieldLogger) {
9191

9292
// enqueueMessage pushes a single message from SQS into redis
9393
func (q *queue) enqueueMessage(msg Message, ctx log.FieldLogger) bool {
94-
body := make(map[string]string)
94+
body := make(map[string]json.RawMessage)
9595
err := json.Unmarshal([]byte(msg.Body), &body)
9696
if err != nil {
9797
ctx.Warn("Message body could not be parsed: ", err.Error())
9898
return true
9999
}
100100

101-
workerClass, ok := q.Topics[topicName(body["TopicArn"])]
101+
var topicARN string
102+
err = json.Unmarshal(body["TopicArn"], &topicARN)
103+
if err != nil {
104+
ctx.Warn("Topic ARN could not be parsed: ", err.Error())
105+
return true
106+
}
107+
108+
workerClass, ok := q.Topics[topicName(topicARN)]
102109
if !ok {
103-
ctx.Warn("No worker for topic: ", topicName(body["TopicArn"]))
110+
ctx.Warn("No worker for topic: ", topicName(topicARN))
104111
return true
105112
}
106113

107-
jid, err := q.WorkerClient.Push(workerClass, body["Message"])
114+
var bodyMessage string
115+
err = json.Unmarshal(body["Message"], &bodyMessage)
116+
if err != nil {
117+
ctx.Warn("'Message' field could not be parsed: ", err.Error())
118+
}
119+
120+
jid, err := q.WorkerClient.Push(workerClass, bodyMessage)
108121
if err != nil {
109122
ctx.WithField("Class", workerClass).Error("Couldn't enqueue worker: ", err.Error())
110123
return false
111124
}
112125

113-
ctx.WithField("Args", body["Message"]).Info("Enqueued job: ", jid)
126+
ctx.WithField("Args", bodyMessage).Info("Enqueued job: ", jid)
114127
return true
115128
}
116129

queue_test.go

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package main
22

33
import (
4+
"encoding/json"
45
"errors"
56
"sync"
67
"testing"
@@ -133,6 +134,37 @@ func (q *QueueTestSuite) TestQueue_UnparseableBody() {
133134
q.assert.Contains(q.sqsClient.Deleted, badMessage)
134135
}
135136

137+
func (q *QueueTestSuite) TestQueue_ComplexBody() {
138+
// make a message with a body that cannot be represented as map[string]string
139+
msg := map[string]interface{}{
140+
"Message": `{"foo":"bar"}`,
141+
"TopicArn": "topicA",
142+
"Some": map[string]string{"other": "data"},
143+
}
144+
145+
data, err := json.Marshal(msg)
146+
if err != nil {
147+
panic(err)
148+
}
149+
150+
message := Message{Body: string(data)}
151+
152+
// set the mock to return that message
153+
q.sqsClient.Fetchable = []Message{message}
154+
155+
// make a some topics
156+
q.queue.Topics["topicA"] = "WorkerA"
157+
158+
// do the work
159+
q.queue.Poll()
160+
161+
// The worker should be enqueued
162+
q.assert.Contains(q.workerClient.Enqueued, []string{"WorkerA", `{"foo":"bar"}`})
163+
164+
// The message should be deleted
165+
q.assert.Contains(q.sqsClient.Deleted, message)
166+
}
167+
136168
func (q *QueueTestSuite) TestQueue_EnqueueError() {
137169
// make a messages
138170
message1 := MockMessage(`{"foo":"bar"}`, "topicA")

0 commit comments

Comments
 (0)