Skip to content

Commit 1b2948f

Browse files
Support empty queries
1 parent 63b22ca commit 1b2948f

File tree

9 files changed

+498
-46
lines changed

9 files changed

+498
-46
lines changed

common/archiver/filestore/queryParser.go

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -82,15 +82,18 @@ func NewQueryParser() QueryParser {
8282
}
8383

8484
func (p *queryParser) Parse(query string) (*parsedQuery, error) {
85+
parsedQuery := &parsedQuery{
86+
earliestCloseTime: time.Time{},
87+
latestCloseTime: time.Now().UTC(),
88+
}
89+
if strings.TrimSpace(query) == "" {
90+
return parsedQuery, nil
91+
}
8592
stmt, err := sqlparser.Parse(fmt.Sprintf(queryTemplate, query))
8693
if err != nil {
8794
return nil, err
8895
}
8996
whereExpr := stmt.(*sqlparser.Select).Where.Expr
90-
parsedQuery := &parsedQuery{
91-
earliestCloseTime: time.Time{},
92-
latestCloseTime: time.Now().UTC(),
93-
}
9497
if err := p.convertWhereExpr(whereExpr, parsedQuery); err != nil {
9598
return nil, err
9699
}

common/archiver/filestore/visibilityArchiver_test.go

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ import (
3737
"github.com/stretchr/testify/suite"
3838
commonpb "go.temporal.io/api/common/v1"
3939
enumspb "go.temporal.io/api/enums/v1"
40+
"go.temporal.io/api/serviceerror"
4041
workflowpb "go.temporal.io/api/workflow/v1"
4142

4243
"go.temporal.io/server/common/searchattribute"
@@ -510,6 +511,73 @@ func (s *visibilityArchiverSuite) TestArchiveAndQuery() {
510511
s.Equal(ei, executions[1])
511512
}
512513

514+
func (s *visibilityArchiverSuite) TestQuery_EmptyQuery_InvalidNamespace() {
515+
URI := s.testArchivalURI
516+
517+
visibilityArchiver := s.newTestVisibilityArchiver()
518+
mockParser := NewMockQueryParser(s.controller)
519+
mockParser.EXPECT().Parse(gomock.Any()).Return(&parsedQuery{
520+
earliestCloseTime: time.Unix(0, 10),
521+
latestCloseTime: time.Unix(0, 10001),
522+
status: toWorkflowExecutionStatusPtr(enumspb.WORKFLOW_EXECUTION_STATUS_FAILED),
523+
}, nil).AnyTimes()
524+
visibilityArchiver.queryParser = mockParser
525+
req := &archiver.QueryVisibilityRequest{
526+
NamespaceID: "",
527+
PageSize: 1,
528+
NextPageToken: nil,
529+
Query: "",
530+
}
531+
_, err := visibilityArchiver.Query(context.Background(), URI, req, searchattribute.TestNameTypeMap)
532+
533+
var svcErr *serviceerror.InvalidArgument
534+
535+
s.ErrorAs(err, &svcErr)
536+
}
537+
538+
func (s *visibilityArchiverSuite) TestQuery_EmptyQuery_ZeroPageSize() {
539+
visibilityArchiver := s.newTestVisibilityArchiver()
540+
541+
req := &archiver.QueryVisibilityRequest{
542+
NamespaceID: testNamespaceID,
543+
PageSize: 0,
544+
NextPageToken: nil,
545+
Query: "",
546+
}
547+
_, err := visibilityArchiver.Query(context.Background(), s.testArchivalURI, req, searchattribute.TestNameTypeMap)
548+
549+
var svcErr *serviceerror.InvalidArgument
550+
551+
s.ErrorAs(err, &svcErr)
552+
}
553+
554+
func (s *visibilityArchiverSuite) TestQuery_EmptyQuery_Pagination() {
555+
dir := testutils.MkdirTemp(s.T(), "", "TestQuery_EmptyQuery_Pagination")
556+
557+
visibilityArchiver := s.newTestVisibilityArchiver()
558+
URI, err := archiver.NewURI("file://" + dir)
559+
s.NoError(err)
560+
for _, record := range s.visibilityRecords {
561+
err := visibilityArchiver.Archive(context.Background(), URI, record)
562+
s.NoError(err)
563+
}
564+
565+
request := &archiver.QueryVisibilityRequest{
566+
NamespaceID: testNamespaceID,
567+
PageSize: 1,
568+
Query: "",
569+
}
570+
var executions []*workflowpb.WorkflowExecutionInfo
571+
for len(executions) == 0 || request.NextPageToken != nil {
572+
response, err := visibilityArchiver.Query(context.Background(), URI, request, searchattribute.TestNameTypeMap)
573+
s.NoError(err)
574+
s.NotNil(response)
575+
executions = append(executions, response.Executions...)
576+
request.NextPageToken = response.NextPageToken
577+
}
578+
s.Len(executions, 4)
579+
}
580+
513581
func (s *visibilityArchiverSuite) newTestVisibilityArchiver() *visibilityArchiver {
514582
config := &config.FilestoreArchiver{
515583
FileMode: testFileModeStr,

common/archiver/gcloud/visibilityArchiver.go

Lines changed: 58 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ import (
2929
"errors"
3030
"fmt"
3131
"path/filepath"
32+
"strings"
3233
"time"
3334

3435
"go.temporal.io/api/serviceerror"
@@ -170,6 +171,10 @@ func (v *visibilityArchiver) Query(
170171
return nil, &serviceerror.InvalidArgument{Message: archiver.ErrInvalidQueryVisibilityRequest.Error()}
171172
}
172173

174+
if strings.TrimSpace(request.Query) == "" {
175+
return v.queryAll(ctx, URI, request, saTypeMap)
176+
}
177+
173178
parsedQuery, err := v.queryParser.Parse(request.Query)
174179
if err != nil {
175180
return nil, &serviceerror.InvalidArgument{Message: err.Error()}
@@ -194,26 +199,51 @@ func (v *visibilityArchiver) Query(
194199

195200
func (v *visibilityArchiver) query(
196201
ctx context.Context,
197-
URI archiver.URI,
202+
uri archiver.URI,
198203
request *queryVisibilityRequest,
199204
saTypeMap searchattribute.NameTypeMap,
200205
) (*archiver.QueryVisibilityResponse, error) {
201-
202-
token := new(queryVisibilityToken)
203-
if request.nextPageToken != nil {
204-
var err error
205-
token, err = deserializeQueryVisibilityToken(request.nextPageToken)
206-
if err != nil {
207-
return nil, &serviceerror.InvalidArgument{Message: archiver.ErrNextPageTokenCorrupted.Error()}
208-
}
209-
}
210-
211-
var prefix = constructVisibilityFilenamePrefix(request.namespaceID, indexKeyCloseTimeout)
206+
prefix := constructVisibilityFilenamePrefix(request.namespaceID, indexKeyCloseTimeout)
212207
if !request.parsedQuery.closeTime.IsZero() {
213-
prefix = constructTimeBasedSearchKey(request.namespaceID, indexKeyCloseTimeout, request.parsedQuery.closeTime, *request.parsedQuery.searchPrecision)
208+
prefix = constructTimeBasedSearchKey(
209+
request.namespaceID,
210+
indexKeyCloseTimeout,
211+
request.parsedQuery.closeTime,
212+
*request.parsedQuery.searchPrecision,
213+
)
214214
}
215+
215216
if !request.parsedQuery.startTime.IsZero() {
216-
prefix = constructTimeBasedSearchKey(request.namespaceID, indexKeyStartTimeout, request.parsedQuery.startTime, *request.parsedQuery.searchPrecision)
217+
prefix = constructTimeBasedSearchKey(
218+
request.namespaceID,
219+
indexKeyStartTimeout,
220+
request.parsedQuery.startTime,
221+
*request.parsedQuery.searchPrecision,
222+
)
223+
}
224+
225+
return v.queryPrefix(ctx, uri, request, saTypeMap, prefix)
226+
}
227+
228+
func (v *visibilityArchiver) queryAll(
229+
ctx context.Context,
230+
URI archiver.URI,
231+
request *archiver.QueryVisibilityRequest,
232+
saTypeMap searchattribute.NameTypeMap,
233+
) (*archiver.QueryVisibilityResponse, error) {
234+
235+
return v.queryPrefix(ctx, URI, &queryVisibilityRequest{
236+
namespaceID: request.NamespaceID,
237+
pageSize: request.PageSize,
238+
nextPageToken: request.NextPageToken,
239+
parsedQuery: &parsedQuery{},
240+
}, saTypeMap, request.NamespaceID)
241+
}
242+
243+
func (v *visibilityArchiver) queryPrefix(ctx context.Context, uri archiver.URI, request *queryVisibilityRequest, saTypeMap searchattribute.NameTypeMap, prefix string) (*archiver.QueryVisibilityResponse, error) {
244+
token, err := v.parseToken(request.nextPageToken)
245+
if err != nil {
246+
return nil, err
217247
}
218248

219249
filters := make([]connector.Precondition, 0)
@@ -229,14 +259,14 @@ func (v *visibilityArchiver) query(
229259
filters = append(filters, newWorkflowIDPrecondition(hash(*request.parsedQuery.workflowType)))
230260
}
231261

232-
filenames, completed, currentCursorPos, err := v.gcloudStorage.QueryWithFilters(ctx, URI, prefix, request.pageSize, token.Offset, filters)
262+
filenames, completed, currentCursorPos, err := v.gcloudStorage.QueryWithFilters(ctx, uri, prefix, request.pageSize, token.Offset, filters)
233263
if err != nil {
234264
return nil, &serviceerror.InvalidArgument{Message: err.Error()}
235265
}
236266

237267
response := &archiver.QueryVisibilityResponse{}
238268
for _, file := range filenames {
239-
encodedRecord, err := v.gcloudStorage.Get(ctx, URI, fmt.Sprintf("%s/%s", request.namespaceID, filepath.Base(file)))
269+
encodedRecord, err := v.gcloudStorage.Get(ctx, uri, fmt.Sprintf("%s/%s", request.namespaceID, filepath.Base(file)))
240270
if err != nil {
241271
return nil, &serviceerror.InvalidArgument{Message: err.Error()}
242272
}
@@ -267,6 +297,18 @@ func (v *visibilityArchiver) query(
267297
return response, nil
268298
}
269299

300+
func (v *visibilityArchiver) parseToken(nextPageToken []byte) (*queryVisibilityToken, error) {
301+
token := new(queryVisibilityToken)
302+
if nextPageToken != nil {
303+
var err error
304+
token, err = deserializeQueryVisibilityToken(nextPageToken)
305+
if err != nil {
306+
return nil, &serviceerror.InvalidArgument{Message: archiver.ErrNextPageTokenCorrupted.Error()}
307+
}
308+
}
309+
return token, nil
310+
}
311+
270312
// ValidateURI is used to define what a valid URI for an implementation is.
271313
func (v *visibilityArchiver) ValidateURI(URI archiver.URI) (err error) {
272314
ctx, cancel := context.WithTimeout(context.Background(), timeoutInSeconds*time.Second)

common/archiver/gcloud/visibilityArchiver_test.go

Lines changed: 131 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,8 @@ import (
3434
"github.com/stretchr/testify/require"
3535
"github.com/stretchr/testify/suite"
3636
enumspb "go.temporal.io/api/enums/v1"
37+
"go.temporal.io/api/serviceerror"
38+
"go.temporal.io/api/workflow/v1"
3739

3840
"go.temporal.io/server/common/searchattribute"
3941

@@ -47,8 +49,11 @@ import (
4749
)
4850

4951
const (
50-
testWorkflowTypeName = "test-workflow-type"
51-
exampleVisibilityRecord = `{"namespaceId":"test-namespace-id","namespace":"test-namespace","workflowId":"test-workflow-id","runId":"test-run-id","workflowTypeName":"test-workflow-type","startTime":"2020-02-05T09:56:14.804475Z","closeTime":"2020-02-05T09:56:15.946478Z","status":"Completed","historyLength":36,"memo":null,"searchAttributes":null,"historyArchivalUri":"gs://my-bucket-cad/temporal_archival/development"}`
52+
testWorkflowTypeName = "test-workflow-type"
53+
exampleVisibilityRecord = `{"namespaceId":"test-namespace-id","namespace":"test-namespace","workflowId":"test-workflow-id","runId":"test-run-id","workflowTypeName":"test-workflow-type","startTime":"2020-02-05T09:56:14.804475Z","closeTime":"2020-02-05T09:56:15.946478Z","status":"Completed","historyLength":36,"memo":null,"searchAttributes":null,"historyArchivalUri":"gs://my-bucket-cad/temporal_archival/development"}`
54+
exampleVisibilityRecord2 = `{"namespaceId":"test-namespace-id","namespace":"test-namespace",
55+
"workflowId":"test-workflow-id2","runId":"test-run-id","workflowTypeName":"test-workflow-type",
56+
"startTime":"2020-02-05T09:56:14.804475Z","closeTime":"2020-02-05T09:56:15.946478Z","status":"Completed","historyLength":36,"memo":null,"searchAttributes":null,"historyArchivalUri":"gs://my-bucket-cad/temporal_archival/development"}`
5257
)
5358

5459
func (s *visibilityArchiverSuite) SetupTest() {
@@ -228,9 +233,11 @@ func (s *visibilityArchiverSuite) TestQuery_Fail_InvalidToken() {
228233
mockParser := NewMockQueryParser(s.controller)
229234
startTime, _ := time.Parse(time.RFC3339, "2019-10-04T11:00:00+00:00")
230235
closeTime := startTime.Add(time.Hour)
236+
precision := PrecisionDay
231237
mockParser.EXPECT().Parse(gomock.Any()).Return(&parsedQuery{
232-
closeTime: closeTime,
233-
startTime: startTime,
238+
closeTime: closeTime,
239+
startTime: startTime,
240+
searchPrecision: &precision,
234241
}, nil)
235242
visibilityArchiver.queryParser = mockParser
236243
request := &archiver.QueryVisibilityRequest{
@@ -257,7 +264,7 @@ func (s *visibilityArchiverSuite) TestQuery_Success_NoNextPageToken() {
257264
s.NoError(err)
258265

259266
mockParser := NewMockQueryParser(s.controller)
260-
dayPrecision := string("Day")
267+
dayPrecision := "Day"
261268
closeTime, _ := time.Parse(time.RFC3339, "2019-10-04T11:00:00+00:00")
262269
mockParser.EXPECT().Parse(gomock.Any()).Return(&parsedQuery{
263270
closeTime: closeTime,
@@ -339,3 +346,122 @@ func (s *visibilityArchiverSuite) TestQuery_Success_SmallPageSize() {
339346
s.NoError(err)
340347
s.Equal(ei, response.Executions[0])
341348
}
349+
350+
func (s *visibilityArchiverSuite) TestQuery_EmptyQuery_InvalidNamespace() {
351+
URI, err := archiver.NewURI("gs://my-bucket-cad/temporal_archival/visibility")
352+
s.NoError(err)
353+
storageWrapper := connector.NewMockClient(s.controller)
354+
storageWrapper.EXPECT().Exist(gomock.Any(), URI, gomock.Any()).Return(false, nil)
355+
arc := newVisibilityArchiver(s.container, storageWrapper)
356+
req := &archiver.QueryVisibilityRequest{
357+
NamespaceID: "",
358+
PageSize: 1,
359+
NextPageToken: nil,
360+
Query: "",
361+
}
362+
_, err = arc.Query(context.Background(), URI, req, searchattribute.TestNameTypeMap)
363+
364+
var svcErr *serviceerror.InvalidArgument
365+
366+
s.ErrorAs(err, &svcErr)
367+
}
368+
369+
func (s *visibilityArchiverSuite) TestQuery_EmptyQuery_ZeroPageSize() {
370+
URI, err := archiver.NewURI("gs://my-bucket-cad/temporal_archival/visibility")
371+
s.NoError(err)
372+
storageWrapper := connector.NewMockClient(s.controller)
373+
storageWrapper.EXPECT().Exist(gomock.Any(), URI, gomock.Any()).Return(false, nil)
374+
arc := newVisibilityArchiver(s.container, storageWrapper)
375+
376+
req := &archiver.QueryVisibilityRequest{
377+
NamespaceID: testNamespaceID,
378+
PageSize: 0,
379+
NextPageToken: nil,
380+
Query: "",
381+
}
382+
_, err = arc.Query(context.Background(), URI, req, searchattribute.TestNameTypeMap)
383+
384+
var svcErr *serviceerror.InvalidArgument
385+
386+
s.ErrorAs(err, &svcErr)
387+
}
388+
389+
func (s *visibilityArchiverSuite) TestQuery_EmptyQuery_Pagination() {
390+
URI, err := archiver.NewURI("gs://my-bucket-cad/temporal_archival/visibility")
391+
s.NoError(err)
392+
storageWrapper := connector.NewMockClient(s.controller)
393+
storageWrapper.EXPECT().Exist(gomock.Any(), URI, gomock.Any()).Return(true, nil).Times(2)
394+
storageWrapper.EXPECT().QueryWithFilters(
395+
gomock.Any(),
396+
URI,
397+
gomock.Any(),
398+
1,
399+
0,
400+
gomock.Any(),
401+
).Return(
402+
[]string{"closeTimeout_2020-02-05T09:56:14Z_test-workflow-id_MobileOnlyWorkflow::processMobileOnly_test-run-id.visibility"},
403+
false,
404+
1,
405+
nil,
406+
)
407+
storageWrapper.EXPECT().QueryWithFilters(
408+
gomock.Any(),
409+
URI,
410+
gomock.Any(),
411+
1,
412+
1,
413+
gomock.Any(),
414+
).Return(
415+
[]string{"closeTimeout_2020-02-05T09:56:14Z_test-workflow-id2_MobileOnlyWorkflow::processMobileOnly_test-run" +
416+
"-id.visibility"},
417+
true,
418+
2,
419+
nil,
420+
)
421+
storageWrapper.EXPECT().Get(
422+
gomock.Any(),
423+
URI,
424+
"test-namespace-id/closeTimeout_2020-02-05T09:56:14Z_test-workflow-id_MobileOnlyWorkflow::processMobileOnly_test-run-id.visibility",
425+
).Return([]byte(exampleVisibilityRecord), nil)
426+
storageWrapper.EXPECT().Get(gomock.Any(), URI,
427+
"test-namespace-id/closeTimeout_2020-02-05T09:56:14Z_test-workflow-id2_MobileOnlyWorkflow"+
428+
"::processMobileOnly_test-run-id.visibility").Return([]byte(exampleVisibilityRecord2), nil)
429+
430+
arc := newVisibilityArchiver(s.container, storageWrapper)
431+
432+
response := &archiver.QueryVisibilityResponse{
433+
Executions: nil,
434+
NextPageToken: nil,
435+
}
436+
437+
limit := 10
438+
executions := make(map[string]*workflow.WorkflowExecutionInfo, limit)
439+
440+
numPages := 2
441+
for i := 0; i < numPages; i++ {
442+
req := &archiver.QueryVisibilityRequest{
443+
NamespaceID: testNamespaceID,
444+
PageSize: 1,
445+
NextPageToken: response.NextPageToken,
446+
Query: "",
447+
}
448+
response, err = arc.Query(context.Background(), URI, req, searchattribute.TestNameTypeMap)
449+
s.NoError(err)
450+
s.NotNil(response)
451+
s.Len(response.Executions, 1)
452+
453+
s.Equal(
454+
i == numPages-1,
455+
response.NextPageToken == nil,
456+
"should have no next page token on the last iteration",
457+
)
458+
459+
for _, execution := range response.Executions {
460+
key := execution.Execution.GetWorkflowId() +
461+
"/" + execution.Execution.GetRunId() +
462+
"/" + execution.CloseTime.String()
463+
executions[key] = execution
464+
}
465+
}
466+
s.Len(executions, 2, "there should be exactly 2 unique executions")
467+
}

0 commit comments

Comments
 (0)