Skip to content

Commit acbe529

Browse files
committed
refactor to use error in CommandProcessedEvent
1 parent a408ea2 commit acbe529

File tree

3 files changed

+21
-27
lines changed

3 files changed

+21
-27
lines changed

client.go

Lines changed: 15 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -341,10 +341,10 @@ func (c *Client) Connect(req ConnectRequest) {
341341
_ = c.unidirectionalConnect(req.toProto(), 0, true)
342342
}
343343

344-
// ConnectNoDisconnect is the same as Client.Connect but does not try to extract Disconnect
345-
// code from the error returned by the connect logic, instead it just returns the error
346-
// to the caller. This error must be handled by the caller on the Transport level.
347-
func (c *Client) ConnectNoDisconnect(req ConnectRequest) error {
344+
// ConnectNoErrorToDisconnect is the same as Client.Connect but does not try to extract
345+
// Disconnect code from the error returned by the connect logic, instead it just returns
346+
// the error to the caller. This error must be handled by the caller on the Transport level.
347+
func (c *Client) ConnectNoErrorToDisconnect(req ConnectRequest) error {
348348
return c.unidirectionalConnect(req.toProto(), 0, false)
349349
}
350350

@@ -394,33 +394,27 @@ func (c *Client) unidirectionalConnect(connectRequest *protocol.ConnectRequest,
394394
cmd = &protocol.Command{Id: 1, Connect: connectRequest}
395395
err := c.issueCommandReadEvent(cmd, connectCmdSize)
396396
if err != nil {
397+
if c.node.clientEvents.commandProcessedHandler != nil {
398+
c.handleCommandFinished(cmd, protocol.FrameTypeConnect, err, nil, started)
399+
}
397400
if errorToDisconnect {
398401
d := extractUnidirectionalDisconnect(err)
399-
if c.node.clientEvents.commandProcessedHandler != nil {
400-
c.handleCommandFinished(cmd, protocol.FrameTypeConnect, &d, nil, started)
401-
}
402402
go func() { _ = c.close(d) }()
403403
return nil
404404
}
405-
if c.node.clientEvents.commandProcessedHandler != nil {
406-
c.handleCommandFinished(cmd, protocol.FrameTypeConnect, &DisconnectConnectionClosed, nil, started)
407-
}
408405
return err
409406
}
410407
}
411408
_, err := c.connectCmd(connectRequest, nil, time.Time{}, nil)
412409
if err != nil {
410+
if c.node.clientEvents.commandProcessedHandler != nil {
411+
c.handleCommandFinished(cmd, protocol.FrameTypeConnect, err, nil, started)
412+
}
413413
if errorToDisconnect {
414414
d := extractUnidirectionalDisconnect(err)
415-
if c.node.clientEvents.commandProcessedHandler != nil {
416-
c.handleCommandFinished(cmd, protocol.FrameTypeConnect, &d, nil, started)
417-
}
418415
go func() { _ = c.close(d) }()
419416
return nil
420417
}
421-
if c.node.clientEvents.commandProcessedHandler != nil {
422-
c.handleCommandFinished(cmd, protocol.FrameTypeConnect, &DisconnectConnectionClosed, nil, started)
423-
}
424418
return err
425419
}
426420
if c.node.clientEvents.commandProcessedHandler != nil {
@@ -1134,12 +1128,12 @@ func isPong(cmd *protocol.Command) bool {
11341128
return cmd.Id == 0 && cmd.Send == nil
11351129
}
11361130

1137-
func (c *Client) handleCommandFinished(cmd *protocol.Command, frameType protocol.FrameType, disconnect *Disconnect, reply *protocol.Reply, started time.Time) {
1131+
func (c *Client) handleCommandFinished(cmd *protocol.Command, frameType protocol.FrameType, err error, reply *protocol.Reply, started time.Time) {
11381132
defer func() {
11391133
c.node.metrics.observeCommandDuration(frameType, time.Since(started))
11401134
}()
11411135
if c.node.clientEvents.commandProcessedHandler != nil {
1142-
event := newCommandProcessedEvent(cmd, disconnect, reply, started)
1136+
event := newCommandProcessedEvent(cmd, err, reply, started)
11431137
c.issueCommandProcessedEvent(event)
11441138
}
11451139
}
@@ -1151,13 +1145,13 @@ func (c *Client) handleCommandDispatchError(ch string, cmd *protocol.Command, fr
11511145
switch t := err.(type) {
11521146
case *Disconnect:
11531147
if c.node.clientEvents.commandProcessedHandler != nil {
1154-
event := newCommandProcessedEvent(cmd, t, nil, started)
1148+
event := newCommandProcessedEvent(cmd, err, nil, started)
11551149
c.issueCommandProcessedEvent(event)
11561150
}
11571151
return t, false
11581152
case Disconnect:
11591153
if c.node.clientEvents.commandProcessedHandler != nil {
1160-
event := newCommandProcessedEvent(cmd, &t, nil, started)
1154+
event := newCommandProcessedEvent(cmd, err, nil, started)
11611155
c.issueCommandProcessedEvent(event)
11621156
}
11631157
return &t, false
@@ -1170,7 +1164,7 @@ func (c *Client) handleCommandDispatchError(ch string, cmd *protocol.Command, fr
11701164
errorReply := &protocol.Reply{Error: toClientErr(err).toProto()}
11711165
c.writeError(ch, frameType, cmd, errorReply, nil)
11721166
if c.node.clientEvents.commandProcessedHandler != nil {
1173-
event := newCommandProcessedEvent(cmd, nil, errorReply, started)
1167+
event := newCommandProcessedEvent(cmd, err, errorReply, started)
11741168
c.issueCommandProcessedEvent(event)
11751169
}
11761170
return nil, cmd.Connect == nil

client_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3985,7 +3985,7 @@ func TestClientUnsubscribeDuringSubscribeCorrectChannels(t *testing.T) {
39853985
require.NoError(t, err)
39863986
}
39873987

3988-
func TestClientConnectNoDisconnect(t *testing.T) {
3988+
func TestClientConnectNoErrorToDisconnect(t *testing.T) {
39893989
t.Parallel()
39903990
errBoom := errors.New("boom")
39913991

@@ -4011,7 +4011,7 @@ func TestClientConnectNoDisconnect(t *testing.T) {
40114011
ctx := context.Background()
40124012
newCtx := SetCredentials(ctx, &Credentials{UserID: "42"})
40134013
client, _ := newClient(newCtx, node, transport)
4014-
err := client.ConnectNoDisconnect(ConnectRequest{})
4014+
err := client.ConnectNoErrorToDisconnect(ConnectRequest{})
40154015
require.Equal(t, tt.Err, err)
40164016
})
40174017
}

events.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -463,8 +463,8 @@ type CommandReadHandler func(*Client, CommandReadEvent) error
463463
type CommandProcessedEvent struct {
464464
// Command which was processed. May be pooled - see comment of CommandProcessedEvent.
465465
Command *protocol.Command
466-
// Disconnect may be set if Command processing resulted into disconnection.
467-
Disconnect *Disconnect
466+
// Error may be set to non-nil if Command processing resulted into error.
467+
Error error
468468
// Reply to the command. Reply may be pooled - see comment of CommandProcessedEvent.
469469
// This Reply may be nil in the following cases:
470470
// 1. For Send command since send commands do not have replies
@@ -477,8 +477,8 @@ type CommandProcessedEvent struct {
477477
}
478478

479479
// newCommandProcessedEvent is a helper to create CommandProcessedEvent.
480-
func newCommandProcessedEvent(command *protocol.Command, disconnect *Disconnect, reply *protocol.Reply, started time.Time) CommandProcessedEvent {
481-
return CommandProcessedEvent{Command: command, Disconnect: disconnect, Reply: reply, Started: started}
480+
func newCommandProcessedEvent(command *protocol.Command, err error, reply *protocol.Reply, started time.Time) CommandProcessedEvent {
481+
return CommandProcessedEvent{Command: command, Error: err, Reply: reply, Started: started}
482482
}
483483

484484
// CommandProcessedHandler allows setting a callback which will be called after

0 commit comments

Comments
 (0)