Skip to content

Commit f99059b

Browse files
Support empty queries
1 parent 19a30b9 commit f99059b

File tree

9 files changed

+501
-51
lines changed

9 files changed

+501
-51
lines changed

common/archiver/filestore/queryParser.go

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -82,15 +82,18 @@ func NewQueryParser() QueryParser {
8282
}
8383

8484
func (p *queryParser) Parse(query string) (*parsedQuery, error) {
85+
parsedQuery := &parsedQuery{
86+
earliestCloseTime: time.Time{},
87+
latestCloseTime: time.Now().UTC(),
88+
}
89+
if strings.TrimSpace(query) == "" {
90+
return parsedQuery, nil
91+
}
8592
stmt, err := sqlparser.Parse(fmt.Sprintf(queryTemplate, query))
8693
if err != nil {
8794
return nil, err
8895
}
8996
whereExpr := stmt.(*sqlparser.Select).Where.Expr
90-
parsedQuery := &parsedQuery{
91-
earliestCloseTime: time.Time{},
92-
latestCloseTime: time.Now().UTC(),
93-
}
9497
if err := p.convertWhereExpr(whereExpr, parsedQuery); err != nil {
9598
return nil, err
9699
}

common/archiver/filestore/visibilityArchiver_test.go

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ import (
3737
"github.com/stretchr/testify/suite"
3838
commonpb "go.temporal.io/api/common/v1"
3939
enumspb "go.temporal.io/api/enums/v1"
40+
"go.temporal.io/api/serviceerror"
4041
workflowpb "go.temporal.io/api/workflow/v1"
4142

4243
"go.temporal.io/server/common/searchattribute"
@@ -510,6 +511,73 @@ func (s *visibilityArchiverSuite) TestArchiveAndQuery() {
510511
s.Equal(ei, executions[1])
511512
}
512513

514+
func (s *visibilityArchiverSuite) TestQuery_EmptyQuery_InvalidNamespace() {
515+
URI := s.testArchivalURI
516+
517+
visibilityArchiver := s.newTestVisibilityArchiver()
518+
mockParser := NewMockQueryParser(s.controller)
519+
mockParser.EXPECT().Parse(gomock.Any()).Return(&parsedQuery{
520+
earliestCloseTime: time.Unix(0, 10),
521+
latestCloseTime: time.Unix(0, 10001),
522+
status: toWorkflowExecutionStatusPtr(enumspb.WORKFLOW_EXECUTION_STATUS_FAILED),
523+
}, nil).AnyTimes()
524+
visibilityArchiver.queryParser = mockParser
525+
req := &archiver.QueryVisibilityRequest{
526+
NamespaceID: "",
527+
PageSize: 1,
528+
NextPageToken: nil,
529+
Query: "",
530+
}
531+
_, err := visibilityArchiver.Query(context.Background(), URI, req, searchattribute.TestNameTypeMap)
532+
533+
var svcErr *serviceerror.InvalidArgument
534+
535+
s.ErrorAs(err, &svcErr)
536+
}
537+
538+
func (s *visibilityArchiverSuite) TestQuery_EmptyQuery_ZeroPageSize() {
539+
visibilityArchiver := s.newTestVisibilityArchiver()
540+
541+
req := &archiver.QueryVisibilityRequest{
542+
NamespaceID: testNamespaceID,
543+
PageSize: 0,
544+
NextPageToken: nil,
545+
Query: "",
546+
}
547+
_, err := visibilityArchiver.Query(context.Background(), s.testArchivalURI, req, searchattribute.TestNameTypeMap)
548+
549+
var svcErr *serviceerror.InvalidArgument
550+
551+
s.ErrorAs(err, &svcErr)
552+
}
553+
554+
func (s *visibilityArchiverSuite) TestQuery_EmptyQuery_Pagination() {
555+
dir := testutils.MkdirTemp(s.T(), "", "TestQuery_EmptyQuery_Pagination")
556+
557+
visibilityArchiver := s.newTestVisibilityArchiver()
558+
URI, err := archiver.NewURI("file://" + dir)
559+
s.NoError(err)
560+
for _, record := range s.visibilityRecords {
561+
err := visibilityArchiver.Archive(context.Background(), URI, record)
562+
s.NoError(err)
563+
}
564+
565+
request := &archiver.QueryVisibilityRequest{
566+
NamespaceID: testNamespaceID,
567+
PageSize: 1,
568+
Query: "",
569+
}
570+
var executions []*workflowpb.WorkflowExecutionInfo
571+
for len(executions) == 0 || request.NextPageToken != nil {
572+
response, err := visibilityArchiver.Query(context.Background(), URI, request, searchattribute.TestNameTypeMap)
573+
s.NoError(err)
574+
s.NotNil(response)
575+
executions = append(executions, response.Executions...)
576+
request.NextPageToken = response.NextPageToken
577+
}
578+
s.Len(executions, 4)
579+
}
580+
513581
func (s *visibilityArchiverSuite) newTestVisibilityArchiver() *visibilityArchiver {
514582
config := &config.FilestoreArchiver{
515583
FileMode: testFileModeStr,

common/archiver/gcloud/visibilityArchiver.go

Lines changed: 61 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ import (
2929
"errors"
3030
"fmt"
3131
"path/filepath"
32+
"strings"
3233
"time"
3334

3435
"go.temporal.io/api/serviceerror"
@@ -161,15 +162,14 @@ func (v *visibilityArchiver) Query(
161162
request *archiver.QueryVisibilityRequest,
162163
saTypeMap searchattribute.NameTypeMap,
163164
) (*archiver.QueryVisibilityResponse, error) {
164-
165-
if err := v.ValidateURI(URI); err != nil {
166-
return nil, &serviceerror.InvalidArgument{Message: archiver.ErrInvalidURI.Error()}
167-
}
168-
169165
if err := archiver.ValidateQueryRequest(request); err != nil {
170166
return nil, &serviceerror.InvalidArgument{Message: archiver.ErrInvalidQueryVisibilityRequest.Error()}
171167
}
172168

169+
if strings.TrimSpace(request.Query) == "" {
170+
return v.queryAll(ctx, URI, request, saTypeMap)
171+
}
172+
173173
parsedQuery, err := v.queryParser.Parse(request.Query)
174174
if err != nil {
175175
return nil, &serviceerror.InvalidArgument{Message: err.Error()}
@@ -194,26 +194,55 @@ func (v *visibilityArchiver) Query(
194194

195195
func (v *visibilityArchiver) query(
196196
ctx context.Context,
197-
URI archiver.URI,
197+
uri archiver.URI,
198198
request *queryVisibilityRequest,
199199
saTypeMap searchattribute.NameTypeMap,
200200
) (*archiver.QueryVisibilityResponse, error) {
201+
prefix := constructVisibilityFilenamePrefix(request.namespaceID, indexKeyCloseTimeout)
202+
if !request.parsedQuery.closeTime.IsZero() {
203+
prefix = constructTimeBasedSearchKey(
204+
request.namespaceID,
205+
indexKeyCloseTimeout,
206+
request.parsedQuery.closeTime,
207+
*request.parsedQuery.searchPrecision,
208+
)
209+
}
201210

202-
token := new(queryVisibilityToken)
203-
if request.nextPageToken != nil {
204-
var err error
205-
token, err = deserializeQueryVisibilityToken(request.nextPageToken)
206-
if err != nil {
207-
return nil, &serviceerror.InvalidArgument{Message: archiver.ErrNextPageTokenCorrupted.Error()}
208-
}
211+
if !request.parsedQuery.startTime.IsZero() {
212+
prefix = constructTimeBasedSearchKey(
213+
request.namespaceID,
214+
indexKeyStartTimeout,
215+
request.parsedQuery.startTime,
216+
*request.parsedQuery.searchPrecision,
217+
)
209218
}
210219

211-
var prefix = constructVisibilityFilenamePrefix(request.namespaceID, indexKeyCloseTimeout)
212-
if !request.parsedQuery.closeTime.IsZero() {
213-
prefix = constructTimeBasedSearchKey(request.namespaceID, indexKeyCloseTimeout, request.parsedQuery.closeTime, *request.parsedQuery.searchPrecision)
220+
return v.queryPrefix(ctx, uri, request, saTypeMap, prefix)
221+
}
222+
223+
func (v *visibilityArchiver) queryAll(
224+
ctx context.Context,
225+
URI archiver.URI,
226+
request *archiver.QueryVisibilityRequest,
227+
saTypeMap searchattribute.NameTypeMap,
228+
) (*archiver.QueryVisibilityResponse, error) {
229+
230+
return v.queryPrefix(ctx, URI, &queryVisibilityRequest{
231+
namespaceID: request.NamespaceID,
232+
pageSize: request.PageSize,
233+
nextPageToken: request.NextPageToken,
234+
parsedQuery: &parsedQuery{},
235+
}, saTypeMap, request.NamespaceID)
236+
}
237+
238+
func (v *visibilityArchiver) queryPrefix(ctx context.Context, uri archiver.URI, request *queryVisibilityRequest, saTypeMap searchattribute.NameTypeMap, prefix string) (*archiver.QueryVisibilityResponse, error) {
239+
if err := v.ValidateURI(uri); err != nil {
240+
return nil, &serviceerror.InvalidArgument{Message: archiver.ErrInvalidURI.Error()}
214241
}
215-
if !request.parsedQuery.startTime.IsZero() {
216-
prefix = constructTimeBasedSearchKey(request.namespaceID, indexKeyStartTimeout, request.parsedQuery.startTime, *request.parsedQuery.searchPrecision)
242+
243+
token, err := v.parseToken(request.nextPageToken)
244+
if err != nil {
245+
return nil, err
217246
}
218247

219248
filters := make([]connector.Precondition, 0)
@@ -229,14 +258,14 @@ func (v *visibilityArchiver) query(
229258
filters = append(filters, newWorkflowIDPrecondition(hash(*request.parsedQuery.workflowType)))
230259
}
231260

232-
filenames, completed, currentCursorPos, err := v.gcloudStorage.QueryWithFilters(ctx, URI, prefix, request.pageSize, token.Offset, filters)
261+
filenames, completed, currentCursorPos, err := v.gcloudStorage.QueryWithFilters(ctx, uri, prefix, request.pageSize, token.Offset, filters)
233262
if err != nil {
234263
return nil, &serviceerror.InvalidArgument{Message: err.Error()}
235264
}
236265

237266
response := &archiver.QueryVisibilityResponse{}
238267
for _, file := range filenames {
239-
encodedRecord, err := v.gcloudStorage.Get(ctx, URI, fmt.Sprintf("%s/%s", request.namespaceID, filepath.Base(file)))
268+
encodedRecord, err := v.gcloudStorage.Get(ctx, uri, fmt.Sprintf("%s/%s", request.namespaceID, filepath.Base(file)))
240269
if err != nil {
241270
return nil, &serviceerror.InvalidArgument{Message: err.Error()}
242271
}
@@ -267,6 +296,18 @@ func (v *visibilityArchiver) query(
267296
return response, nil
268297
}
269298

299+
func (v *visibilityArchiver) parseToken(nextPageToken []byte) (*queryVisibilityToken, error) {
300+
token := new(queryVisibilityToken)
301+
if nextPageToken != nil {
302+
var err error
303+
token, err = deserializeQueryVisibilityToken(nextPageToken)
304+
if err != nil {
305+
return nil, &serviceerror.InvalidArgument{Message: archiver.ErrNextPageTokenCorrupted.Error()}
306+
}
307+
}
308+
return token, nil
309+
}
310+
270311
// ValidateURI is used to define what a valid URI for an implementation is.
271312
func (v *visibilityArchiver) ValidateURI(URI archiver.URI) (err error) {
272313
ctx, cancel := context.WithTimeout(context.Background(), timeoutInSeconds*time.Second)

0 commit comments

Comments
 (0)