Skip to content

Commit e752104

Browse files
authored
Merge pull request #121 from go-stomp/fix-issue-120
fix: wrong header sent for explicit acknowledgment
2 parents b48ba3c + a55e4a5 commit e752104

File tree

5 files changed

+29
-21
lines changed

5 files changed

+29
-21
lines changed

conn.go

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,13 @@ package stomp
22

33
import (
44
"errors"
5-
"github.com/go-stomp/stomp/v3/frame"
65
"io"
76
"net"
87
"strconv"
98
"sync"
109
"time"
10+
11+
"github.com/go-stomp/stomp/v3/frame"
1112
)
1213

1314
// Default time span to add to read/write heart-beat timeouts
@@ -750,10 +751,12 @@ func (c *Conn) createAckNackFrame(msg *Message, ack bool) (*frame.Frame, error)
750751
return nil, missingHeader(frame.MessageId)
751752
}
752753
case V12:
753-
if ack, ok := msg.Header.Contains(frame.Id); ok {
754+
// message frame contains ack header
755+
if ack, ok := msg.Header.Contains(frame.Ack); ok {
756+
// ack frame should reference it as id
754757
f.Header.Add(frame.Id, ack)
755758
} else {
756-
return nil, missingHeader(frame.Id)
759+
return nil, missingHeader(frame.Ack)
757760
}
758761
}
759762

conn_test.go

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ func (rw *fakeReaderWriter) Close() error {
3232

3333
func (s *StompSuite) Test_conn_option_set_logger(c *C) {
3434
fc1, fc2 := testutil.NewFakeConn(c)
35-
go func() {
35+
go func() {
3636

3737
defer func() {
3838
fc2.Close()
@@ -384,8 +384,8 @@ func subscribeHelper(c *C, ackMode AckMode, version Version, opts ...func(*frame
384384
frame.Subscription, id,
385385
frame.MessageId, messageId,
386386
frame.Destination, destination)
387-
if version == V12 {
388-
f4.Header.Add(frame.Id, messageId)
387+
if version == V12 && ackMode.ShouldAck() {
388+
f4.Header.Add(frame.Ack, messageId)
389389
}
390390
f4.Body = []byte(bodyText)
391391
err = rw.Write(f4)
@@ -395,7 +395,7 @@ func subscribeHelper(c *C, ackMode AckMode, version Version, opts ...func(*frame
395395
f5, _ := rw.Read()
396396
c.Assert(f5.Command, Equals, "ACK")
397397
if version == V12 {
398-
c.Assert(f5.Header.Get("id"), Equals, messageId)
398+
c.Assert(f5.Header.Get(frame.Id), Equals, messageId)
399399
} else {
400400
c.Assert(f5.Header.Get("subscription"), Equals, id)
401401
c.Assert(f5.Header.Get("message-id"), Equals, messageId)
@@ -431,6 +431,9 @@ func subscribeHelper(c *C, ackMode AckMode, version Version, opts ...func(*frame
431431
c.Assert(msg.Body, DeepEquals, []byte(bodyText))
432432
c.Assert(msg.Destination, Equals, "/queue/test-1")
433433
c.Assert(msg.Header.Get(frame.MessageId), Equals, messageId)
434+
if version == V12 && ackMode.ShouldAck() {
435+
c.Assert(msg.Header.Get(frame.Ack), Equals, messageId)
436+
}
434437

435438
c.Assert(msg.ShouldAck(), Equals, ackMode.ShouldAck())
436439
if msg.ShouldAck() {
@@ -491,8 +494,8 @@ func subscribeTransactionHelper(c *C, ackMode AckMode, version Version, abort bo
491494
frame.Subscription, id,
492495
frame.MessageId, messageId,
493496
frame.Destination, destination)
494-
if version == V12 {
495-
f4.Header.Add(frame.Id, messageId)
497+
if version == V12 && ackMode.ShouldAck() {
498+
f4.Header.Add(frame.Ack, messageId)
496499
}
497500
f4.Body = []byte(bodyText)
498501
err = rw.Write(f4)
@@ -514,7 +517,7 @@ func subscribeTransactionHelper(c *C, ackMode AckMode, version Version, abort bo
514517
c.Assert(f5.Command, Equals, "ACK")
515518
}
516519
if version == V12 {
517-
c.Assert(f5.Header.Get("id"), Equals, messageId)
520+
c.Assert(f5.Header.Get(frame.Id), Equals, messageId)
518521
} else {
519522
c.Assert(f5.Header.Get("subscription"), Equals, id)
520523
c.Assert(f5.Header.Get("message-id"), Equals, messageId)

examples/client_test/client_test

-2.41 MB
Binary file not shown.

server/client/conn.go

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -406,18 +406,19 @@ func (c *Conn) cleanupSubChannel() {
406406

407407
// Send a frame to the client, allocating necessary headers prior.
408408
func (c *Conn) allocateMessageId(f *frame.Frame, sub *Subscription) {
409-
if f.Command == frame.MESSAGE {
409+
if f.Command == frame.MESSAGE || f.Command == frame.ACK {
410410
// allocate the value of message-id for this frame
411411
c.lastMsgId++
412412
messageId := strconv.FormatUint(c.lastMsgId, 10)
413413
f.Header.Set(frame.MessageId, messageId)
414+
f.Header.Set(frame.Id, messageId)
414415

415416
// if there is any requirement by the client to acknowledge, set
416417
// the ack header as per STOMP 1.2
417418
if sub == nil || sub.ack == frame.AckAuto {
418-
f.Header.Del(frame.Id)
419+
f.Header.Del(frame.Ack)
419420
} else {
420-
f.Header.Set(frame.Id, messageId)
421+
f.Header.Set(frame.Ack, messageId)
421422
}
422423
}
423424
}
@@ -659,7 +660,7 @@ func (c *Conn) handleAck(f *frame.Frame) error {
659660
var err error
660661
var msgId string
661662

662-
if ack, ok := f.Header.Contains(frame.Id); ok {
663+
if ack, ok := f.Header.Contains(frame.Ack); ok {
663664
msgId = ack
664665
} else if msgId, ok = f.Header.Contains(frame.MessageId); !ok {
665666
return missingHeader(frame.MessageId)
@@ -702,7 +703,7 @@ func (c *Conn) handleNack(f *frame.Frame) error {
702703
var err error
703704
var msgId string
704705

705-
if ack, ok := f.Header.Contains(frame.Id); ok {
706+
if ack, ok := f.Header.Contains(frame.Ack); ok {
706707
msgId = ack
707708
} else if msgId, ok = f.Header.Contains(frame.MessageId); !ok {
708709
return missingHeader(frame.MessageId)

server/server_test.go

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -50,27 +50,28 @@ func (s *ServerSuite) TestConnectAndDisconnect(c *C) {
5050
func (s *ServerSuite) TestHeartBeatingTolerance(c *C) {
5151
// Heart beat should not close connection exactly after not receiving message after cx
5252
// it should add a pretty decent amount of time to counter network delay of other timing issues
53-
addr := ":59092"
54-
l, err := net.Listen("tcp", addr)
53+
l, err := net.Listen("tcp", `127.0.0.1:0`)
5554
c.Assert(err, IsNil)
5655
defer func() { l.Close() }()
5756
serv := Server{
58-
Addr: "",
57+
Addr: l.Addr().String(),
5958
Authenticator: nil,
6059
QueueStorage: nil,
6160
HeartBeat: 5 * time.Millisecond,
6261
}
6362
go serv.Serve(l)
6463

65-
conn, err := net.Dial("tcp", "127.0.0.1"+addr)
64+
conn, err := net.Dial("tcp", l.Addr().String())
6665
c.Assert(err, IsNil)
6766
defer conn.Close()
6867

69-
client, err := stomp.Connect(conn, stomp.ConnOpt.HeartBeat(5 * time.Millisecond, 5 * time.Millisecond))
68+
client, err := stomp.Connect(conn,
69+
stomp.ConnOpt.HeartBeat(5 * time.Millisecond, 5 * time.Millisecond),
70+
)
7071
c.Assert(err, IsNil)
7172
defer client.Disconnect()
7273

73-
time.Sleep(serv.HeartBeat * 50) // let it go for some time to allow client and server to exchange some heart beat
74+
time.Sleep(serv.HeartBeat * 20) // let it go for some time to allow client and server to exchange some heart beat
7475

7576
// Ensure the server has not closed his readChannel
7677
err = client.Send("/topic/whatever", "text/plain", []byte("hello"))

0 commit comments

Comments
 (0)