11/*
2- * Copyright (c) 2008-2021 , Hazelcast, Inc. All Rights Reserved.
2+ * Copyright (c) 2008-2022 , Hazelcast, Inc. All Rights Reserved.
33 *
44 * Licensed under the Apache License, Version 2.0 (the "License")
55 * you may not use this file except in compliance with the License.
@@ -19,7 +19,7 @@ package invocation
1919import (
2020 "context"
2121 "fmt"
22- "sync/atomic "
22+ "sync"
2323 "time"
2424
2525 "github.com/hazelcast/hazelcast-go-client/hzerrors"
@@ -29,11 +29,6 @@ import (
2929 "github.com/hazelcast/hazelcast-go-client/internal/proto"
3030)
3131
32- const (
33- ready = 0
34- stopped = 1
35- )
36-
3732var serviceSubID = event .NextSubscriptionID ()
3833
3934type Handler interface {
@@ -53,13 +48,11 @@ type Service struct {
5348 removeCh chan int64
5449 executor * stripeExecutor
5550 logger logger.LogAdaptor
56- state int32
51+ stateMu * sync.RWMutex
52+ running bool
5753}
5854
59- func NewService (
60- handler Handler ,
61- eventDispatcher * event.DispatchService ,
62- logger logger.LogAdaptor ) * Service {
55+ func NewService (handler Handler , ed * event.DispatchService , lg logger.LogAdaptor ) * Service {
6356 s := & Service {
6457 requestCh : make (chan Invocation ),
6558 urgentRequestCh : make (chan Invocation ),
@@ -69,14 +62,20 @@ func NewService(
6962 groupLostCh : make (chan * GroupLostEvent ),
7063 invocations : map [int64 ]Invocation {},
7164 handler : handler ,
72- eventDispatcher : eventDispatcher ,
73- logger : logger ,
74- state : ready ,
65+ eventDispatcher : ed ,
66+ logger : lg ,
67+ stateMu : & sync.RWMutex {},
68+ running : true ,
7569 executor : newStripeExecutor (),
7670 }
7771 s .eventDispatcher .Subscribe (EventGroupLost , serviceSubID , func (event event.Event ) {
7872 go func () {
79- s .groupLostCh <- event .(* GroupLostEvent )
73+ select {
74+ case s .groupLostCh <- event .(* GroupLostEvent ):
75+ return
76+ case <- s .doneCh :
77+ return
78+ }
8079 }()
8180 })
8281 s .executor .start ()
@@ -85,9 +84,12 @@ func NewService(
8584}
8685
8786func (s * Service ) Stop () {
88- if ! atomic .CompareAndSwapInt32 (& s .state , ready , stopped ) {
87+ s .stateMu .Lock ()
88+ defer s .stateMu .Unlock ()
89+ if ! s .running {
8990 return
9091 }
92+ s .running = false
9193 s .executor .stop ()
9294 close (s .doneCh )
9395}
@@ -249,7 +251,9 @@ func (s *Service) unregisterInvocation(correlationID int64) Invocation {
249251}
250252
251253func (s * Service ) handleGroupLost (e * GroupLostEvent ) {
252- if atomic .LoadInt32 (& s .state ) != ready {
254+ s .stateMu .RLock ()
255+ defer s .stateMu .RUnlock ()
256+ if ! s .running {
253257 return
254258 }
255259 for corrID , inv := range s .invocations {
0 commit comments