Skip to content

Commit e82fe80

Browse files
mustard-mhroboquat
authored andcommitted
[public-api] add workspaceStatus stream rpc
1 parent 40c1d2e commit e82fe80

File tree

10 files changed

+940
-423
lines changed

10 files changed

+940
-423
lines changed

components/public-api-server/pkg/apiv1/workspace.go

Lines changed: 50 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ func (s *WorkspaceService) GetWorkspace(ctx context.Context, req *connect.Reques
5151
return nil, proxy.ConvertError(err)
5252
}
5353

54-
instance, err := convertWorkspaceInstance(workspace)
54+
instance, err := convertWorkspaceInstance(workspace.LatestInstance, workspace.Workspace.Shareable)
5555
if err != nil {
5656
logger.WithError(err).Error("Failed to convert workspace instance.")
5757
instance = &v1.WorkspaceInstance{}
@@ -77,6 +77,52 @@ func (s *WorkspaceService) GetWorkspace(ctx context.Context, req *connect.Reques
7777
}), nil
7878
}
7979

80+
func (s *WorkspaceService) StreamWorkspaceStatus(ctx context.Context, req *connect.Request[v1.StreamWorkspaceStatusRequest], stream *connect.ServerStream[v1.StreamWorkspaceStatusResponse]) error {
81+
workspaceID, err := validateWorkspaceID(req.Msg.GetWorkspaceId())
82+
if err != nil {
83+
return err
84+
}
85+
86+
logger := ctxlogrus.Extract(ctx).WithField("workspace_id", workspaceID)
87+
88+
conn, err := getConnection(ctx, s.connectionPool)
89+
if err != nil {
90+
return err
91+
}
92+
93+
workspace, err := conn.GetWorkspace(ctx, workspaceID)
94+
if err != nil {
95+
logger.WithError(err).Error("Failed to get workspace.")
96+
return proxy.ConvertError(err)
97+
}
98+
99+
if workspace.LatestInstance == nil {
100+
logger.WithError(err).Error("Failed to get latest instance.")
101+
return connect.NewError(connect.CodeFailedPrecondition, fmt.Errorf("instance not found"))
102+
}
103+
104+
ch, err := conn.InstanceUpdates(ctx, workspace.LatestInstance.ID)
105+
if err != nil {
106+
logger.WithError(err).Error("Failed to get workspace instance updates.")
107+
return proxy.ConvertError(err)
108+
}
109+
110+
for update := range ch {
111+
instance, err := convertWorkspaceInstance(update, workspace.Workspace.Shareable)
112+
if err != nil {
113+
logger.WithError(err).Error("Failed to convert workspace instance.")
114+
return proxy.ConvertError(err)
115+
}
116+
_ = stream.Send(&v1.StreamWorkspaceStatusResponse{
117+
Result: &v1.WorkspaceStatus{
118+
Instance: instance,
119+
},
120+
})
121+
}
122+
123+
return nil
124+
}
125+
80126
func (s *WorkspaceService) GetOwnerToken(ctx context.Context, req *connect.Request[v1.GetOwnerTokenRequest]) (*connect.Response[v1.GetOwnerTokenResponse], error) {
81127
workspaceID, err := validateWorkspaceID(req.Msg.GetWorkspaceId())
82128
if err != nil {
@@ -230,7 +276,7 @@ func getLimitFromPagination(pagination *v1.Pagination) (int, error) {
230276

231277
// convertWorkspaceInfo convers a "protocol workspace" to a "public API workspace". Returns gRPC errors if things go wrong.
232278
func convertWorkspaceInfo(input *protocol.WorkspaceInfo) (*v1.Workspace, error) {
233-
instance, err := convertWorkspaceInstance(input)
279+
instance, err := convertWorkspaceInstance(input.LatestInstance, input.Workspace.Shareable)
234280
if err != nil {
235281
return nil, err
236282
}
@@ -252,8 +298,7 @@ func convertWorkspaceInfo(input *protocol.WorkspaceInfo) (*v1.Workspace, error)
252298
}, nil
253299
}
254300

255-
func convertWorkspaceInstance(input *protocol.WorkspaceInfo) (*v1.WorkspaceInstance, error) {
256-
wsi := input.LatestInstance
301+
func convertWorkspaceInstance(wsi *protocol.WorkspaceInstance, shareable bool) (*v1.WorkspaceInstance, error) {
257302
if wsi == nil {
258303
return nil, nil
259304
}
@@ -292,7 +337,7 @@ func convertWorkspaceInstance(input *protocol.WorkspaceInfo) (*v1.WorkspaceInsta
292337
}
293338

294339
var admissionLevel v1.AdmissionLevel
295-
if input.Workspace.Shareable {
340+
if shareable {
296341
admissionLevel = v1.AdmissionLevel_ADMISSION_LEVEL_EVERYONE
297342
} else {
298343
admissionLevel = v1.AdmissionLevel_ADMISSION_LEVEL_OWNER_ONLY

components/public-api-server/pkg/apiv1/workspace_test.go

Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -349,6 +349,120 @@ func TestWorkspaceService_ListWorkspaces(t *testing.T) {
349349
}
350350
}
351351

352+
func TestWorkspaceService_StreamWorkspaceStatus(t *testing.T) {
353+
const (
354+
workspaceID = "easycz-seer-xl8o1zacpyw"
355+
instanceID = "f2effcfd-3ddb-4187-b584-256e88a42442"
356+
ownerToken = "some-owner-token"
357+
)
358+
359+
t.Run("not found when workspace does not exist", func(t *testing.T) {
360+
serverMock, client := setupWorkspacesService(t)
361+
362+
serverMock.EXPECT().GetWorkspace(gomock.Any(), workspaceID).Return(nil, &jsonrpc2.Error{
363+
Code: 404,
364+
Message: "not found",
365+
})
366+
367+
resp, _ := client.StreamWorkspaceStatus(context.Background(), connect.NewRequest(&v1.StreamWorkspaceStatusRequest{
368+
WorkspaceId: workspaceID,
369+
}))
370+
371+
resp.Receive()
372+
373+
require.Error(t, resp.Err())
374+
require.Equal(t, connect.CodeNotFound, connect.CodeOf(resp.Err()))
375+
})
376+
377+
t.Run("returns a workspace status", func(t *testing.T) {
378+
serverMock, client := setupWorkspacesService(t)
379+
380+
serverMock.EXPECT().GetWorkspace(gomock.Any(), workspaceID).Return(&workspaceTestData[0].Protocol, nil)
381+
serverMock.EXPECT().InstanceUpdates(gomock.Any(), instanceID).DoAndReturn(func(ctx context.Context, instanceID string) (<-chan *protocol.WorkspaceInstance, error) {
382+
ch := make(chan *protocol.WorkspaceInstance)
383+
go func() {
384+
ch <- workspaceTestData[0].Protocol.LatestInstance
385+
}()
386+
go func() {
387+
<-ctx.Done()
388+
close(ch)
389+
}()
390+
return ch, nil
391+
})
392+
393+
ctx, cancel := context.WithCancel(context.Background())
394+
resp, err := client.StreamWorkspaceStatus(ctx, connect.NewRequest(&v1.StreamWorkspaceStatusRequest{
395+
WorkspaceId: workspaceID,
396+
}))
397+
398+
require.NoError(t, err)
399+
400+
resp.Receive()
401+
cancel()
402+
403+
requireEqualProto(t, workspaceTestData[0].API.Status, resp.Msg().Result)
404+
})
405+
}
406+
407+
func TestClientServerStreamInterceptor(t *testing.T) {
408+
testInterceptor := &TestInterceptor{
409+
expectedToken: "auth-token",
410+
t: t,
411+
}
412+
413+
ctrl := gomock.NewController(t)
414+
t.Cleanup(ctrl.Finish)
415+
416+
serverMock := protocol.NewMockAPIInterface(ctrl)
417+
418+
svc := NewWorkspaceService(&FakeServerConnPool{
419+
api: serverMock,
420+
})
421+
422+
_, handler := v1connect.NewWorkspacesServiceHandler(svc, connect.WithInterceptors(auth.NewServerInterceptor(), testInterceptor))
423+
424+
srv := httptest.NewServer(handler)
425+
t.Cleanup(srv.Close)
426+
427+
client := v1connect.NewWorkspacesServiceClient(http.DefaultClient, srv.URL, connect.WithInterceptors(
428+
auth.NewClientInterceptor("auth-token"),
429+
testInterceptor,
430+
))
431+
432+
resp, _ := client.StreamWorkspaceStatus(context.Background(), connect.NewRequest(&v1.StreamWorkspaceStatusRequest{
433+
WorkspaceId: "",
434+
}))
435+
436+
resp.Close()
437+
}
438+
439+
type TestInterceptor struct {
440+
expectedToken string
441+
t *testing.T
442+
}
443+
444+
func (ti *TestInterceptor) WrapUnary(next connect.UnaryFunc) connect.UnaryFunc {
445+
return func(ctx context.Context, req connect.AnyRequest) (connect.AnyResponse, error) {
446+
return next(ctx, req)
447+
}
448+
}
449+
450+
func (ti *TestInterceptor) WrapStreamingClient(next connect.StreamingClientFunc) connect.StreamingClientFunc {
451+
return func(ctx context.Context, spec connect.Spec) connect.StreamingClientConn {
452+
token, _ := auth.TokenFromContext(ctx)
453+
require.Equal(ti.t, ti.expectedToken, token.Value)
454+
return next(ctx, spec)
455+
}
456+
}
457+
458+
func (ti *TestInterceptor) WrapStreamingHandler(next connect.StreamingHandlerFunc) connect.StreamingHandlerFunc {
459+
return func(ctx context.Context, conn connect.StreamingHandlerConn) error {
460+
token, _ := auth.TokenFromContext(ctx)
461+
require.Equal(ti.t, ti.expectedToken, token.Value)
462+
return next(ctx, conn)
463+
}
464+
}
465+
352466
type workspaceTestDataEntry struct {
353467
Name string
354468
Protocol protocol.WorkspaceInfo

components/public-api-server/pkg/auth/middleware.go

Lines changed: 56 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -11,25 +11,50 @@ import (
1111
"github.com/bufbuild/connect-go"
1212
)
1313

14-
// NewServerInterceptor creates a server-side interceptor which validates that an incoming request contains a Bearer Authorization header
15-
func NewServerInterceptor() connect.UnaryInterceptorFunc {
16-
interceptor := func(next connect.UnaryFunc) connect.UnaryFunc {
17-
return connect.UnaryFunc(func(ctx context.Context, req connect.AnyRequest) (connect.AnyResponse, error) {
14+
type AuthInterceptor struct {
15+
accessToken string
16+
}
1817

19-
if req.Spec().IsClient {
20-
return next(ctx, req)
21-
}
18+
func (a *AuthInterceptor) WrapUnary(next connect.UnaryFunc) connect.UnaryFunc {
19+
return connect.UnaryFunc(func(ctx context.Context, req connect.AnyRequest) (connect.AnyResponse, error) {
20+
if req.Spec().IsClient {
21+
ctx = TokenToContext(ctx, NewAccessToken(a.accessToken))
22+
23+
req.Header().Add(authorizationHeaderKey, bearerPrefix+a.accessToken)
24+
return next(ctx, req)
25+
}
2226

23-
token, err := tokenFromRequest(ctx, req)
24-
if err != nil {
25-
return nil, err
26-
}
27+
token, err := tokenFromRequest(ctx, req)
28+
if err != nil {
29+
return nil, err
30+
}
2731

28-
return next(TokenToContext(ctx, token), req)
29-
})
32+
return next(TokenToContext(ctx, token), req)
33+
})
34+
}
35+
36+
func (a *AuthInterceptor) WrapStreamingClient(next connect.StreamingClientFunc) connect.StreamingClientFunc {
37+
return func(ctx context.Context, s connect.Spec) connect.StreamingClientConn {
38+
ctx = TokenToContext(ctx, NewAccessToken(a.accessToken))
39+
conn := next(ctx, s)
40+
conn.RequestHeader().Add(authorizationHeaderKey, bearerPrefix+a.accessToken)
41+
return conn
3042
}
43+
}
3144

32-
return connect.UnaryInterceptorFunc(interceptor)
45+
func (a *AuthInterceptor) WrapStreamingHandler(next connect.StreamingHandlerFunc) connect.StreamingHandlerFunc {
46+
return func(ctx context.Context, conn connect.StreamingHandlerConn) error {
47+
token, err := tokenFromConn(ctx, conn)
48+
if err != nil {
49+
return err
50+
}
51+
return next(TokenToContext(ctx, token), conn)
52+
}
53+
}
54+
55+
// NewServerInterceptor creates a server-side interceptor which validates that an incoming request contains a Bearer Authorization header
56+
func NewServerInterceptor() connect.Interceptor {
57+
return &AuthInterceptor{}
3358
}
3459

3560
func tokenFromRequest(ctx context.Context, req connect.AnyRequest) (Token, error) {
@@ -48,21 +73,25 @@ func tokenFromRequest(ctx context.Context, req connect.AnyRequest) (Token, error
4873
return Token{}, connect.NewError(connect.CodeUnauthenticated, fmt.Errorf("No access token or cookie credentials available on request."))
4974
}
5075

51-
// NewClientInterceptor creates a client-side interceptor which injects token as a Bearer Authorization header
52-
func NewClientInterceptor(accessToken string) connect.UnaryInterceptorFunc {
53-
interceptor := func(next connect.UnaryFunc) connect.UnaryFunc {
54-
return connect.UnaryFunc(func(ctx context.Context, req connect.AnyRequest) (connect.AnyResponse, error) {
55-
56-
if !req.Spec().IsClient {
57-
return next(ctx, req)
58-
}
76+
func tokenFromConn(ctx context.Context, conn connect.StreamingHandlerConn) (Token, error) {
77+
headers := conn.RequestHeader()
5978

60-
ctx = TokenToContext(ctx, NewAccessToken(accessToken))
79+
bearerToken, err := BearerTokenFromHeaders(headers)
80+
if err == nil {
81+
return NewAccessToken(bearerToken), nil
82+
}
6183

62-
req.Header().Add(authorizationHeaderKey, bearerPrefix+accessToken)
63-
return next(ctx, req)
64-
})
84+
cookie := conn.RequestHeader().Get("Cookie")
85+
if cookie != "" {
86+
return NewCookieToken(cookie), nil
6587
}
6688

67-
return connect.UnaryInterceptorFunc(interceptor)
89+
return Token{}, connect.NewError(connect.CodeUnauthenticated, fmt.Errorf("No access token or cookie credentials available on request."))
90+
}
91+
92+
// NewClientInterceptor creates a client-side interceptor which injects token as a Bearer Authorization header
93+
func NewClientInterceptor(accessToken string) connect.Interceptor {
94+
return &AuthInterceptor{
95+
accessToken: accessToken,
96+
}
6897
}

components/public-api-server/pkg/auth/middleware_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,8 @@ func TestNewServerInterceptor(t *testing.T) {
6060
request.Header().Add(header.Key, header.Value)
6161
}
6262

63-
resp, err := NewServerInterceptor()(handler)(ctx, request)
63+
interceptor := NewServerInterceptor()
64+
resp, err := interceptor.WrapUnary(handler)(ctx, request)
6465

6566
require.Equal(t, s.ExpectedError, err)
6667
if err == nil {

components/public-api/gitpod/experimental/v1/workspaces.proto

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,9 @@ service WorkspacesService {
1616
// GetWorkspace returns a single workspace.
1717
rpc GetWorkspace(GetWorkspaceRequest) returns (GetWorkspaceResponse) {}
1818

19+
// StreamWorkspaceStatus returns workspace status once it changed.
20+
rpc StreamWorkspaceStatus(StreamWorkspaceStatusRequest) returns (stream StreamWorkspaceStatusResponse) {}
21+
1922
// GetOwnerToken returns an owner token.
2023
rpc GetOwnerToken(GetOwnerTokenRequest) returns (GetOwnerTokenResponse) {}
2124

@@ -53,6 +56,14 @@ message GetWorkspaceResponse {
5356
Workspace result = 1;
5457
}
5558

59+
message StreamWorkspaceStatusRequest {
60+
string workspace_id = 1;
61+
}
62+
63+
message StreamWorkspaceStatusResponse {
64+
WorkspaceStatus result = 1;
65+
}
66+
5667
message GetOwnerTokenRequest {
5768
string workspace_id = 1;
5869
}

0 commit comments

Comments
 (0)