Skip to content
This repository was archived by the owner on Mar 9, 2022. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 12 additions & 8 deletions pkg/server/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,15 +116,16 @@ func convertEvent(e *gogotypes.Any) (string, interface{}, error) {
}

// start starts the event monitor which monitors and handles all container events. It returns
// a channel for the caller to wait for the event monitor to stop. start must be called after
// subscribe.
func (em *eventMonitor) start() (<-chan struct{}, error) {
// an error channel for the caller to wait for stop errors from the event monitor.
// start must be called after subscribe.
func (em *eventMonitor) start() <-chan error {
errCh := make(chan error)
if em.ch == nil || em.errCh == nil {
return nil, errors.New("event channel is nil")
panic("event channel is nil")
}
closeCh := make(chan struct{})
backOffCheckCh := em.backOff.start()
go func() {
defer close(errCh)
for {
select {
case e := <-em.ch:
Expand All @@ -144,8 +145,11 @@ func (em *eventMonitor) start() (<-chan struct{}, error) {
em.backOff.enBackOff(cID, evt)
}
case err := <-em.errCh:
logrus.WithError(err).Error("Failed to handle event stream")
close(closeCh)
// Close errCh in defer directly if there is no error.
if err != nil {
logrus.WithError(err).Errorf("Failed to handle event stream")
errCh <- err
}
return
case <-backOffCheckCh:
cIDs := em.backOff.getExpiredContainers()
Expand All @@ -162,7 +166,7 @@ func (em *eventMonitor) start() (<-chan struct{}, error) {
}
}
}()
return closeCh, nil
return errCh
}

// stop stops the event monitor. It will close the event channel.
Expand Down
40 changes: 26 additions & 14 deletions pkg/server/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package server
import (
"fmt"
"io"
"net/http"
"path/filepath"
"time"

Expand Down Expand Up @@ -179,10 +180,7 @@ func (c *criService) Run() error {

// Start event handler.
logrus.Info("Start event monitor")
eventMonitorCloseCh, err := c.eventMonitor.start()
if err != nil {
return errors.Wrap(err, "failed to start event monitor")
}
eventMonitorErrCh := c.eventMonitor.start()

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you can still check if eventMonitorErrCh == nil and leave the old return message if it does, though the make chan error should always work...

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, don't think we need to check. :) If it is nil, it is our problem.


// Start snapshot stats syncer, it doesn't need to be stopped.
logrus.Info("Start snapshots syncer")
Expand All @@ -195,27 +193,32 @@ func (c *criService) Run() error {

// Start streaming server.
logrus.Info("Start streaming server")
streamServerCloseCh := make(chan struct{})
streamServerErrCh := make(chan error)
go func() {
if err := c.streamServer.Start(true); err != nil {
defer close(streamServerErrCh)
if err := c.streamServer.Start(true); err != nil && err != http.ErrServerClosed {
logrus.WithError(err).Error("Failed to start streaming server")
streamServerErrCh <- err
}
close(streamServerCloseCh)
}()

// Set the server as initialized. GRPC services could start serving traffic.
c.initialized.Set()

var eventMonitorErr, streamServerErr error
// Stop the whole CRI service if any of the critical service exits.
select {
case <-eventMonitorCloseCh:
case <-streamServerCloseCh:
case eventMonitorErr = <-eventMonitorErrCh:
case streamServerErr = <-streamServerErrCh:
}
if err := c.Close(); err != nil {
return errors.Wrap(err, "failed to stop cri service")
}

<-eventMonitorCloseCh
// If the error is set above, err from channel must be nil here, because
// the channel is supposed to be closed. Or else, we wait and set it.
if err := <-eventMonitorErrCh; err != nil {
eventMonitorErr = err
}
logrus.Info("Event monitor stopped")
// There is a race condition with http.Server.Serve.
// When `Close` is called at the same time with `Serve`, `Close`
Expand All @@ -227,18 +230,27 @@ func (c *criService) Run() error {
// is fixed.
const streamServerStopTimeout = 2 * time.Second
select {
case <-streamServerCloseCh:
case err := <-streamServerErrCh:
if err != nil {
streamServerErr = err

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same comment as above ..

@Random-Liu Random-Liu May 31, 2018

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto.

Actually, I plan to move the channel wait logic into the stop function, which should make things more clear.

Basically what we are doing is:

  1. Wait for either event monitor and stream server to stop;
  2. Event monitor and stream server are both important system component. If one of them stops, we gracefully stop CRI plugin and report an error.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

kk cool

}
logrus.Info("Stream server stopped")
case <-time.After(streamServerStopTimeout):
logrus.Errorf("Stream server is not stopped in %q", streamServerStopTimeout)
}
if eventMonitorErr != nil {
return errors.Wrap(eventMonitorErr, "event monitor error")
}
if streamServerErr != nil {
return errors.Wrap(streamServerErr, "stream server error")
}
return nil
}

// Stop stops the CRI service.
// Close stops the CRI service.
// TODO(random-liu): Make close synchronous.
func (c *criService) Close() error {
logrus.Info("Stop CRI service")
// TODO(random-liu): Make event monitor stop synchronous.
c.eventMonitor.stop()
if err := c.streamServer.Stop(); err != nil {
return errors.Wrap(err, "failed to stop stream server")
Expand Down