Skip to content

Commit ac68b63

Browse files
committed
Use a WaitGroup to stop runnableServers
Manager doesn't actually wait for Runnables to stop, so we need to add a WaitGroup to wait for the HTTP Server Shutdown to complete. This is hopefully temporary until kubernetes-sigs/controller-runtime#350 is fixed.
1 parent f7e3a62 commit ac68b63

File tree

1 file changed

+32
-12
lines changed

1 file changed

+32
-12
lines changed

cmd/broker/ingress/main.go

Lines changed: 32 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323
"log"
2424
"net/http"
2525
"os"
26+
"sync"
2627
"time"
2728

2829
eventingv1alpha1 "github.com/knative/eventing/pkg/apis/eventing/v1alpha1"
@@ -43,6 +44,7 @@ var (
4344

4445
readTimeout = 1 * time.Minute
4546
writeTimeout = 1 * time.Minute
47+
wg sync.WaitGroup
4648
)
4749

4850
func main() {
@@ -75,8 +77,10 @@ func main() {
7577
}
7678

7779
err = mgr.Add(&runnableServer{
78-
logger: logger,
79-
s: s,
80+
logger: logger,
81+
s: s,
82+
ShutdownTimeout: writeTimeout,
83+
wg: &wg,
8084
})
8185
if err != nil {
8286
logger.Fatal("Unable to add runnableServer", zap.Error(err))
@@ -102,6 +106,7 @@ func main() {
102106
s: metricsSrv,
103107
logger: logger,
104108
ShutdownTimeout: writeTimeout,
109+
wg: &wg,
105110
})
106111
if err != nil {
107112
logger.Fatal("Unable to add metrics runnableServer", zap.Error(err))
@@ -114,12 +119,8 @@ func main() {
114119
logger.Error("manager.Start() returned an error", zap.Error(err))
115120
}
116121
logger.Info("Exiting...")
117-
118-
ctx, cancel := context.WithTimeout(context.Background(), writeTimeout)
119-
defer cancel()
120-
if err = s.Shutdown(ctx); err != nil {
121-
logger.Error("Shutdown returned an error", zap.Error(err))
122-
}
122+
wg.Wait()
123+
logger.Info("Done.")
123124
}
124125

125126
func getRequiredEnv(envKey string) string {
@@ -202,13 +203,31 @@ type runnableServer struct {
202203
// shutdown will never time out.
203204
// TODO alternative: zero shuts down immediately, negative means infinite
204205
ShutdownTimeout time.Duration
206+
// wg is a temporary workaround for Manager returning immediately without
207+
// waiting for Runnables to stop. See
208+
// https://github.com/kubernetes-sigs/controller-runtime/issues/350.
209+
wg *sync.WaitGroup
205210
}
206211

207212
func (r *runnableServer) Start(stopCh <-chan struct{}) error {
208213
logger := r.logger.With(zap.String("address", r.s.Addr))
209214
logger.Info("Listening...")
215+
216+
errCh := make(chan error)
217+
210218
go func() {
211-
<-stopCh
219+
r.wg.Add(1)
220+
err := r.s.ListenAndServe()
221+
if err != http.ErrServerClosed {
222+
errCh <- err
223+
}
224+
}()
225+
226+
var err error
227+
select {
228+
case err = <-errCh:
229+
logger.Error("Error running HTTP server", zap.Error(err))
230+
case <-stopCh:
212231
var ctx context.Context
213232
var cancel context.CancelFunc
214233
if r.ShutdownTimeout > 0 {
@@ -218,11 +237,12 @@ func (r *runnableServer) Start(stopCh <-chan struct{}) error {
218237
ctx = context.Background()
219238
}
220239
logger.Info("Shutting down...")
221-
if err := r.s.Shutdown(ctx); err != nil {
240+
if err = r.s.Shutdown(ctx); err != nil {
222241
logger.Error("Shutdown returned an error", zap.Error(err))
223242
} else {
224243
logger.Info("Shutdown done")
225244
}
226-
}()
227-
return r.s.ListenAndServe()
245+
}
246+
r.wg.Done()
247+
return err
228248
}

0 commit comments

Comments
 (0)