Skip to content

Commit 457e6e1

Browse files
committed
Merge branch 'master' into remove-auto-subscription-id
2 parents 5d8b863 + a5c2607 commit 457e6e1

1 file changed

Lines changed: 22 additions & 18 deletions

File tree

internal/invocation/invocation_service.go

Lines changed: 22 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (c) 2008-2021, Hazelcast, Inc. All Rights Reserved.
2+
* Copyright (c) 2008-2022, Hazelcast, Inc. All Rights Reserved.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License")
55
* you may not use this file except in compliance with the License.
@@ -19,7 +19,7 @@ package invocation
1919
import (
2020
"context"
2121
"fmt"
22-
"sync/atomic"
22+
"sync"
2323
"time"
2424

2525
"github.com/hazelcast/hazelcast-go-client/hzerrors"
@@ -29,11 +29,6 @@ import (
2929
"github.com/hazelcast/hazelcast-go-client/internal/proto"
3030
)
3131

32-
const (
33-
ready = 0
34-
stopped = 1
35-
)
36-
3732
var serviceSubID = event.NextSubscriptionID()
3833

3934
type Handler interface {
@@ -53,13 +48,11 @@ type Service struct {
5348
removeCh chan int64
5449
executor *stripeExecutor
5550
logger logger.LogAdaptor
56-
state int32
51+
stateMu *sync.RWMutex
52+
running bool
5753
}
5854

59-
func NewService(
60-
handler Handler,
61-
eventDispatcher *event.DispatchService,
62-
logger logger.LogAdaptor) *Service {
55+
func NewService(handler Handler, ed *event.DispatchService, lg logger.LogAdaptor) *Service {
6356
s := &Service{
6457
requestCh: make(chan Invocation),
6558
urgentRequestCh: make(chan Invocation),
@@ -69,14 +62,20 @@ func NewService(
6962
groupLostCh: make(chan *GroupLostEvent),
7063
invocations: map[int64]Invocation{},
7164
handler: handler,
72-
eventDispatcher: eventDispatcher,
73-
logger: logger,
74-
state: ready,
65+
eventDispatcher: ed,
66+
logger: lg,
67+
stateMu: &sync.RWMutex{},
68+
running: true,
7569
executor: newStripeExecutor(),
7670
}
7771
s.eventDispatcher.Subscribe(EventGroupLost, serviceSubID, func(event event.Event) {
7872
go func() {
79-
s.groupLostCh <- event.(*GroupLostEvent)
73+
select {
74+
case s.groupLostCh <- event.(*GroupLostEvent):
75+
return
76+
case <-s.doneCh:
77+
return
78+
}
8079
}()
8180
})
8281
s.executor.start()
@@ -85,9 +84,12 @@ func NewService(
8584
}
8685

8786
func (s *Service) Stop() {
88-
if !atomic.CompareAndSwapInt32(&s.state, ready, stopped) {
87+
s.stateMu.Lock()
88+
defer s.stateMu.Unlock()
89+
if !s.running {
8990
return
9091
}
92+
s.running = false
9193
s.executor.stop()
9294
close(s.doneCh)
9395
}
@@ -249,7 +251,9 @@ func (s *Service) unregisterInvocation(correlationID int64) Invocation {
249251
}
250252

251253
func (s *Service) handleGroupLost(e *GroupLostEvent) {
252-
if atomic.LoadInt32(&s.state) != ready {
254+
s.stateMu.RLock()
255+
defer s.stateMu.RUnlock()
256+
if !s.running {
253257
return
254258
}
255259
for corrID, inv := range s.invocations {

0 commit comments

Comments
 (0)