Skip to content

Commit fabf69f

Browse files
committed
[public-api-server] Group http and grpc servers into a baseserver package
1 parent 55d1e4b commit fabf69f

File tree

12 files changed

+516
-74
lines changed

12 files changed

+516
-74
lines changed

.github/CODEOWNERS

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66

77
/components/blobserve @gitpod-io/engineering-workspace
88
/components/common-go @gitpod-io/engineering-workspace
9+
/components/common-go/baseserver @gitpod-io/engineering-webapp
910
/components/content-service-api @csweichel @geropl
1011
/components/content-service @gitpod-io/engineering-workspace
1112
/components/dashboard @gitpod-io/engineering-webapp
Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
// Copyright (c) 2022 Gitpod GmbH. All rights reserved.
2+
// Licensed under the GNU Affero General Public License (AGPL).
3+
// See License-AGPL.txt in the project root for license information.
4+
5+
package baseserver
6+
7+
import (
8+
"fmt"
9+
"github.com/gitpod-io/gitpod/common-go/log"
10+
"github.com/sirupsen/logrus"
11+
"time"
12+
)
13+
14+
type config struct {
15+
logger *logrus.Entry
16+
17+
// hostname is the hostname on which our servers will listen.
18+
hostname string
19+
// grpcPort is the port we listen on for gRPC traffic
20+
grpcPort int
21+
// httpPort is the port we listen on for HTTP traffic
22+
httpPort int
23+
24+
// closeTimeout is the amount we allow for the server to shut down cleanly
25+
closeTimeout time.Duration
26+
}
27+
28+
func defaultConfig() *config {
29+
return &config{
30+
logger: log.New(),
31+
hostname: "localhost",
32+
httpPort: 9000,
33+
grpcPort: 9001,
34+
closeTimeout: 5 * time.Second,
35+
}
36+
}
37+
38+
type Option func(cfg *config) error
39+
40+
func WithHostname(hostname string) Option {
41+
return func(cfg *config) error {
42+
cfg.hostname = hostname
43+
return nil
44+
}
45+
}
46+
47+
func WithHTTPPort(port int) Option {
48+
return func(cfg *config) error {
49+
if port < 0 {
50+
return fmt.Errorf("http port must be greater than 0, got: %d", port)
51+
}
52+
53+
cfg.httpPort = port
54+
return nil
55+
}
56+
}
57+
58+
func WithGRPCPort(port int) Option {
59+
return func(cfg *config) error {
60+
if port < 0 {
61+
return fmt.Errorf("grpc port must be greater than 0, got: %d", port)
62+
}
63+
64+
cfg.grpcPort = port
65+
return nil
66+
}
67+
}
68+
69+
func WithLogger(logger *logrus.Entry) Option {
70+
return func(cfg *config) error {
71+
if logger == nil {
72+
return fmt.Errorf("nil logger specified")
73+
}
74+
75+
cfg.logger = logger
76+
return nil
77+
}
78+
}
79+
80+
func WithCloseTimeout(d time.Duration) Option {
81+
return func(cfg *config) error {
82+
cfg.closeTimeout = d
83+
return nil
84+
}
85+
}
86+
87+
func evaluateOptions(cfg *config, opts ...Option) (*config, error) {
88+
for _, opt := range opts {
89+
if err := opt(cfg); err != nil {
90+
return nil, fmt.Errorf("failed to evaluate config: %w", err)
91+
}
92+
}
93+
94+
return cfg, nil
95+
}
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
// Copyright (c) 2022 Gitpod GmbH. All rights reserved.
2+
// Licensed under the GNU Affero General Public License (AGPL).
3+
// See License-AGPL.txt in the project root for license information.
4+
5+
package baseserver
6+
7+
import (
8+
"github.com/gitpod-io/gitpod/common-go/log"
9+
"github.com/stretchr/testify/require"
10+
"testing"
11+
"time"
12+
)
13+
14+
func TestOptions(t *testing.T) {
15+
logger := log.New()
16+
httpPort := 8080
17+
grpcPort := 8081
18+
timeout := 10 * time.Second
19+
hostname := "another_hostname"
20+
21+
var opts = []Option{
22+
WithHostname(hostname),
23+
WithHTTPPort(httpPort),
24+
WithGRPCPort(grpcPort),
25+
WithLogger(logger),
26+
WithCloseTimeout(timeout),
27+
}
28+
cfg, err := evaluateOptions(defaultConfig(), opts...)
29+
require.NoError(t, err)
30+
31+
require.Equal(t, &config{
32+
logger: logger,
33+
hostname: hostname,
34+
grpcPort: grpcPort,
35+
httpPort: httpPort,
36+
closeTimeout: timeout,
37+
}, cfg)
38+
}
Lines changed: 211 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,211 @@
1+
// Copyright (c) 2022 Gitpod GmbH. All rights reserved.
2+
// Licensed under the GNU Affero General Public License (AGPL).
3+
// See License-AGPL.txt in the project root for license information.
4+
5+
package baseserver
6+
7+
import (
8+
"context"
9+
"fmt"
10+
"github.com/sirupsen/logrus"
11+
"google.golang.org/grpc"
12+
"net"
13+
"net/http"
14+
"os"
15+
"os/signal"
16+
"syscall"
17+
)
18+
19+
func New(name string, opts ...Option) (*Server, error) {
20+
cfg, err := evaluateOptions(defaultConfig(), opts...)
21+
if err != nil {
22+
return nil, fmt.Errorf("invalid config: %w", err)
23+
}
24+
25+
server := &Server{
26+
Name: name,
27+
cfg: cfg,
28+
}
29+
30+
if initErr := server.initializeHTTP(); initErr != nil {
31+
return nil, fmt.Errorf("failed to initialize http server: %w", initErr)
32+
}
33+
if initErr := server.initializeGRPC(); initErr != nil {
34+
return nil, fmt.Errorf("failed to initialize grpc server: %w", initErr)
35+
}
36+
37+
return server, nil
38+
}
39+
40+
type Server struct {
41+
// Name is the name of this server, used for logging context
42+
Name string
43+
44+
cfg *config
45+
46+
// http is an http Server
47+
http *http.Server
48+
httpMux *http.ServeMux
49+
httpListener net.Listener
50+
51+
// grpc is a grpc Server
52+
grpc *grpc.Server
53+
grpcListener net.Listener
54+
55+
// listening indicates the server is serving. When closed, the server is in the process of graceful termination.
56+
listening chan struct{}
57+
}
58+
59+
func (s *Server) ListenAndServe() error {
60+
var err error
61+
s.grpcListener, err = net.Listen("tcp", fmt.Sprintf(":%d", s.cfg.grpcPort))
62+
if err != nil {
63+
return fmt.Errorf("failed to acquire port %d", s.cfg.grpcPort)
64+
}
65+
66+
s.httpListener, err = net.Listen("tcp", fmt.Sprintf(":%d", s.cfg.httpPort))
67+
if err != nil {
68+
return fmt.Errorf("failed to acquire port %d", s.cfg.grpcPort)
69+
}
70+
71+
errors := make(chan error)
72+
defer close(errors)
73+
s.listening = make(chan struct{})
74+
75+
go func() {
76+
s.Logger().
77+
WithField("protocol", "grpc").
78+
Infof("Serving gRPC on %s", s.grpcListener.Addr().String())
79+
if serveErr := s.grpc.Serve(s.grpcListener); serveErr != nil {
80+
if s.isClosing() {
81+
return
82+
}
83+
84+
errors <- serveErr
85+
}
86+
}()
87+
88+
go func() {
89+
s.Logger().
90+
WithField("protocol", "http").
91+
Infof("Serving http on %s", s.httpListener.Addr().String())
92+
if serveErr := s.http.Serve(s.httpListener); serveErr != nil {
93+
if s.isClosing() {
94+
return
95+
}
96+
97+
errors <- serveErr
98+
}
99+
}()
100+
101+
signals := make(chan os.Signal, 1)
102+
signal.Notify(signals, syscall.SIGINT, syscall.SIGTERM)
103+
104+
// Await operating system signals, or server errors.
105+
select {
106+
case sig := <-signals:
107+
s.Logger().Infof("Received system signal %s, closing server.", sig.String())
108+
if closeErr := s.Close(); closeErr != nil {
109+
s.Logger().WithError(closeErr).Error("Failed to close server.")
110+
return closeErr
111+
}
112+
113+
return nil
114+
case serverErr := <-errors:
115+
s.Logger().WithError(serverErr).Errorf("Server encountered an error. Closing remaining servers.")
116+
if closeErr := s.Close(); closeErr != nil {
117+
return fmt.Errorf("failed to close server after one of the servers errored: %w", closeErr)
118+
}
119+
120+
return serverErr
121+
}
122+
}
123+
124+
func (s *Server) Close() error {
125+
ctx, cancel := context.WithTimeout(context.Background(), s.cfg.closeTimeout)
126+
defer cancel()
127+
128+
return s.close(ctx)
129+
}
130+
131+
func (s *Server) Logger() *logrus.Entry {
132+
return s.cfg.logger
133+
}
134+
135+
func (s *Server) HTTPAddress() string {
136+
protocol := "http"
137+
return fmt.Sprintf("%s://%s:%d", protocol, s.cfg.hostname, s.cfg.httpPort)
138+
}
139+
140+
func (s *Server) GRPCAddress() string {
141+
protocol := "http"
142+
return fmt.Sprintf("%s://%s:%d", protocol, s.cfg.hostname, s.cfg.grpcPort)
143+
}
144+
145+
func (s *Server) HTTPMux() *http.ServeMux {
146+
return s.httpMux
147+
}
148+
149+
func (s *Server) close(ctx context.Context) error {
150+
if s.listening == nil {
151+
return fmt.Errorf("server is not running, invalid close operaiton")
152+
}
153+
154+
if s.isClosing() {
155+
s.Logger().Info("Server is already closing.")
156+
return nil
157+
}
158+
159+
s.Logger().Info("Received graceful shutdown request.")
160+
close(s.listening)
161+
162+
s.grpc.GracefulStop()
163+
// s.grpc.GracefulStop() also closes the underlying net.Listener, we just release the reference.
164+
s.grpcListener = nil
165+
s.Logger().Info("GRPC server terminated.")
166+
167+
if err := s.http.Shutdown(ctx); err != nil {
168+
return fmt.Errorf("failed to close http server: %w", err)
169+
}
170+
// s.http.Shutdown() also closes the underlying net.Listener, we just release the reference.
171+
s.httpListener = nil
172+
s.Logger().Info("HTTP server terminated.")
173+
174+
return nil
175+
}
176+
177+
func (s *Server) isClosing() bool {
178+
select {
179+
case <-s.listening:
180+
// listening channel is closed, we're in graceful shutdown mode
181+
return true
182+
default:
183+
return false
184+
}
185+
}
186+
187+
func (s *Server) initializeHTTP() error {
188+
s.httpMux = s.newHTTPMux()
189+
s.http = &http.Server{
190+
Addr: fmt.Sprintf(":%d", s.cfg.httpPort),
191+
Handler: s.httpMux,
192+
}
193+
194+
return nil
195+
}
196+
197+
func (s *Server) newHTTPMux() *http.ServeMux {
198+
mux := http.NewServeMux()
199+
// TODO(milan): Use a ready/health package already used in ws-manager
200+
mux.HandleFunc("/ready", func(w http.ResponseWriter, r *http.Request) {
201+
_, _ = w.Write([]byte(`ready`))
202+
})
203+
204+
return mux
205+
}
206+
207+
func (s *Server) initializeGRPC() error {
208+
s.grpc = grpc.NewServer()
209+
210+
return nil
211+
}
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
// Copyright (c) 2022 Gitpod GmbH. All rights reserved.
2+
// Licensed under the GNU Affero General Public License (AGPL).
3+
// See License-AGPL.txt in the project root for license information.
4+
5+
package baseserver_test
6+
7+
import (
8+
"fmt"
9+
"github.com/gitpod-io/gitpod/common-go/baseserver"
10+
"github.com/stretchr/testify/require"
11+
"net/http"
12+
"testing"
13+
"time"
14+
)
15+
16+
func TestServer_StartStop(t *testing.T) {
17+
// We don't use the helper NewForTests, because we want to control stopping ourselves.
18+
srv, err := baseserver.New("server_test")
19+
require.NoError(t, err)
20+
21+
go func() {
22+
require.NoError(t, srv.ListenAndServe())
23+
}()
24+
25+
baseserver.WaitForServerToBeReachable(t, srv, 3*time.Second)
26+
require.NoError(t, srv.Close())
27+
}
28+
29+
func TestServer_ServesReady(t *testing.T) {
30+
srv := baseserver.NewForTests(t)
31+
32+
go func(t *testing.T) {
33+
require.NoError(t, srv.ListenAndServe())
34+
}(t)
35+
36+
baseserver.WaitForServerToBeReachable(t, srv, 3*time.Second)
37+
38+
readyUR := fmt.Sprintf("%s/ready", srv.HTTPAddress())
39+
resp, err := http.Get(readyUR)
40+
require.NoError(t, err)
41+
require.Equal(t, http.StatusOK, resp.StatusCode)
42+
}

0 commit comments

Comments
 (0)