Skip to content

Commit 43b7a35

Browse files
Support empty S3 queries
1 parent 6ea38d7 commit 43b7a35

File tree

5 files changed

+244
-17
lines changed

5 files changed

+244
-17
lines changed

common/archiver/s3store/util.go

Lines changed: 29 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -166,15 +166,40 @@ func constructTimeBasedSearchKey(path, namespaceID, primaryIndexKey, primaryInde
166166
timeFormat = "2006-01-02T" + timeFormat
167167
}
168168

169-
return fmt.Sprintf("%s/%s", constructVisibilitySearchPrefix(path, namespaceID, primaryIndexKey, primaryIndexValue, secondaryIndexKey), t.Format(timeFormat))
169+
return fmt.Sprintf(
170+
"%s/%s",
171+
constructIndexedVisibilitySearchPrefix(path, namespaceID, primaryIndexKey, primaryIndexValue, secondaryIndexKey),
172+
t.Format(timeFormat),
173+
)
170174
}
171175

172176
func constructTimestampIndex(path, namespaceID, primaryIndexKey, primaryIndexValue, secondaryIndexKey string, secondaryIndexValue time.Time, runID string) string {
173-
return fmt.Sprintf("%s/%s/%s", constructVisibilitySearchPrefix(path, namespaceID, primaryIndexKey, primaryIndexValue, secondaryIndexKey), secondaryIndexValue.Format(time.RFC3339), runID)
177+
return fmt.Sprintf(
178+
"%s/%s/%s",
179+
constructIndexedVisibilitySearchPrefix(path, namespaceID, primaryIndexKey, primaryIndexValue, secondaryIndexKey),
180+
secondaryIndexValue.Format(time.RFC3339),
181+
runID,
182+
)
174183
}
175184

176-
func constructVisibilitySearchPrefix(path, namespaceID, primaryIndexKey, primaryIndexValue, secondaryIndexType string) string {
177-
return strings.TrimLeft(strings.Join([]string{path, namespaceID, "visibility", primaryIndexKey, primaryIndexValue, secondaryIndexType}, "/"), "/")
185+
func constructIndexedVisibilitySearchPrefix(
186+
path string,
187+
namespaceID string,
188+
primaryIndexKey string,
189+
primaryIndexValue string,
190+
secondaryIndexType string,
191+
) string {
192+
return strings.TrimLeft(
193+
strings.Join(
194+
[]string{path, namespaceID, "visibility", primaryIndexKey, primaryIndexValue, secondaryIndexType},
195+
"/",
196+
),
197+
"/",
198+
)
199+
}
200+
201+
func constructVisibilitySearchPrefix(path, namespaceID string) string {
202+
return strings.TrimLeft(strings.Join([]string{path, namespaceID, "visibility"}, "/"), "/")
178203
}
179204

180205
func ensureContextTimeout(ctx context.Context) (context.Context, context.CancelFunc) {
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
// The MIT License
2+
//
3+
// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved.
4+
//
5+
// Copyright (c) 2020 Uber Technologies, Inc.
6+
//
7+
// Permission is hereby granted, free of charge, to any person obtaining a copy
8+
// of this software and associated documentation files (the "Software"), to deal
9+
// in the Software without restriction, including without limitation the rights
10+
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
11+
// copies of the Software, and to permit persons to whom the Software is
12+
// furnished to do so, subject to the following conditions:
13+
//
14+
// The above copyright notice and this permission notice shall be included in
15+
// all copies or substantial portions of the Software.
16+
//
17+
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
18+
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
19+
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
20+
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
21+
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
22+
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
23+
// THE SOFTWARE.
24+
25+
package s3store
26+
27+
import (
28+
"testing"
29+
30+
"github.com/stretchr/testify/assert"
31+
)
32+
33+
func TestConstructVisibilitySearchPrefix(t *testing.T) {
34+
t.Parallel()
35+
assert.Equal(
36+
t,
37+
constructVisibilitySearchPrefix(
38+
"path",
39+
"namespaceID",
40+
),
41+
"path/namespaceID/visibility",
42+
)
43+
}
44+
45+
func TestConstructIndexedVisibilitySearchPrefix(t *testing.T) {
46+
t.Parallel()
47+
assert.Equal(
48+
t,
49+
constructIndexedVisibilitySearchPrefix(
50+
"/path",
51+
"namespaceID",
52+
"primaryIndexKey",
53+
"primaryIndexValue",
54+
"secondaryIndexType",
55+
),
56+
"path/namespaceID/visibility/primaryIndexKey/primaryIndexValue/secondaryIndexType",
57+
)
58+
}

common/archiver/s3store/visibilityArchiver.go

Lines changed: 76 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@ package s3store
2626

2727
import (
2828
"context"
29+
"errors"
30+
"strings"
2931
"time"
3032

3133
"github.com/aws/aws-sdk-go/aws"
@@ -175,6 +177,9 @@ func (v *visibilityArchiver) Query(
175177
return nil, serviceerror.NewInvalidArgument(archiver.ErrInvalidURI.Error())
176178
}
177179

180+
if strings.TrimSpace(request.Query) == "" {
181+
return v.queryAll(ctx, URI, request, saTypeMap)
182+
}
178183
if err := archiver.ValidateQueryRequest(request); err != nil {
179184
return nil, serviceerror.NewInvalidArgument(archiver.ErrInvalidQueryVisibilityRequest.Error())
180185
}
@@ -197,34 +202,94 @@ func (v *visibilityArchiver) Query(
197202
)
198203
}
199204

205+
// queryAll returns all workflow executions in the archive.
206+
func (v *visibilityArchiver) queryAll(
207+
ctx context.Context,
208+
uri archiver.URI,
209+
request *archiver.QueryVisibilityRequest,
210+
saTypeMap searchattribute.NameTypeMap,
211+
) (*archiver.QueryVisibilityResponse, error) {
212+
if err := archiver.ValidateQueryRequest(request); err != nil && !errors.Is(err, archiver.ErrEmptyQuery) {
213+
return nil, serviceerror.NewInvalidArgument(archiver.ErrInvalidQueryVisibilityRequest.Error())
214+
}
215+
216+
return v.queryPrefix(ctx, uri, &queryVisibilityRequest{
217+
namespaceID: request.NamespaceID,
218+
pageSize: request.PageSize,
219+
nextPageToken: request.NextPageToken,
220+
parsedQuery: &parsedQuery{
221+
workflowTypeName: nil,
222+
workflowID: nil,
223+
startTime: nil,
224+
closeTime: nil,
225+
searchPrecision: nil,
226+
},
227+
}, saTypeMap, constructVisibilitySearchPrefix(uri.Path(), request.NamespaceID))
228+
}
229+
200230
func (v *visibilityArchiver) query(
201231
ctx context.Context,
202232
URI archiver.URI,
203233
request *queryVisibilityRequest,
204234
saTypeMap searchattribute.NameTypeMap,
205235
) (*archiver.QueryVisibilityResponse, error) {
206-
ctx, cancel := ensureContextTimeout(ctx)
207-
defer cancel()
208-
var token *string
209-
if request.nextPageToken != nil {
210-
token = deserializeQueryVisibilityToken(request.nextPageToken)
211-
}
212236
primaryIndex := primaryIndexKeyWorkflowTypeName
213237
primaryIndexValue := request.parsedQuery.workflowTypeName
214238
if request.parsedQuery.workflowID != nil {
215239
primaryIndex = primaryIndexKeyWorkflowID
216240
primaryIndexValue = request.parsedQuery.workflowID
217241
}
218-
var prefix = constructVisibilitySearchPrefix(URI.Path(), request.namespaceID, primaryIndex, *primaryIndexValue, secondaryIndexKeyCloseTimeout) + "/"
242+
243+
prefix := constructIndexedVisibilitySearchPrefix(
244+
URI.Path(),
245+
request.namespaceID,
246+
primaryIndex,
247+
*primaryIndexValue,
248+
secondaryIndexKeyCloseTimeout,
249+
) + "/"
219250
if request.parsedQuery.closeTime != nil {
220-
prefix = constructTimeBasedSearchKey(URI.Path(), request.namespaceID, primaryIndex, *primaryIndexValue, secondaryIndexKeyCloseTimeout, *request.parsedQuery.closeTime, *request.parsedQuery.searchPrecision)
251+
prefix = constructTimeBasedSearchKey(
252+
URI.Path(),
253+
request.namespaceID,
254+
primaryIndex,
255+
*primaryIndexValue,
256+
secondaryIndexKeyCloseTimeout,
257+
*request.parsedQuery.closeTime,
258+
*request.parsedQuery.searchPrecision,
259+
)
221260
}
222261
if request.parsedQuery.startTime != nil {
223-
prefix = constructTimeBasedSearchKey(URI.Path(), request.namespaceID, primaryIndex, *primaryIndexValue, secondaryIndexKeyStartTimeout, *request.parsedQuery.startTime, *request.parsedQuery.searchPrecision)
262+
prefix = constructTimeBasedSearchKey(
263+
URI.Path(),
264+
request.namespaceID,
265+
primaryIndex,
266+
*primaryIndexValue,
267+
secondaryIndexKeyStartTimeout,
268+
*request.parsedQuery.startTime,
269+
*request.parsedQuery.searchPrecision,
270+
)
224271
}
225272

273+
return v.queryPrefix(ctx, URI, request, saTypeMap, prefix)
274+
}
275+
276+
func (v *visibilityArchiver) queryPrefix(
277+
ctx context.Context,
278+
uri archiver.URI,
279+
request *queryVisibilityRequest,
280+
saTypeMap searchattribute.NameTypeMap,
281+
prefix string,
282+
) (*archiver.QueryVisibilityResponse, error) {
283+
ctx, cancel := ensureContextTimeout(ctx)
284+
defer cancel()
285+
286+
var token *string
287+
288+
if request.nextPageToken != nil {
289+
token = deserializeQueryVisibilityToken(request.nextPageToken)
290+
}
226291
results, err := v.s3cli.ListObjectsV2WithContext(ctx, &s3.ListObjectsV2Input{
227-
Bucket: aws.String(URI.Hostname()),
292+
Bucket: aws.String(uri.Hostname()),
228293
Prefix: aws.String(prefix),
229294
MaxKeys: aws.Int64(int64(request.pageSize)),
230295
ContinuationToken: token,
@@ -244,7 +309,7 @@ func (v *visibilityArchiver) query(
244309
response.NextPageToken = serializeQueryVisibilityToken(*results.NextContinuationToken)
245310
}
246311
for _, item := range results.Contents {
247-
encodedRecord, err := Download(ctx, v.s3cli, URI, *item.Key)
312+
encodedRecord, err := Download(ctx, v.s3cli, uri, *item.Key)
248313
if err != nil {
249314
return nil, serviceerror.NewUnavailable(err.Error())
250315
}

common/archiver/s3store/visibilityArchiver_test.go

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ import (
3939
"github.com/stretchr/testify/require"
4040
"github.com/stretchr/testify/suite"
4141
enumspb "go.temporal.io/api/enums/v1"
42+
"go.temporal.io/api/serviceerror"
4243

4344
"go.temporal.io/server/common/searchattribute"
4445

@@ -355,6 +356,84 @@ func (s *visibilityArchiverSuite) TestQuery_Success_SmallPageSize() {
355356
s.Equal(ei, response.Executions[0])
356357
}
357358

359+
func (s *visibilityArchiverSuite) TestQuery_EmptyQuery_InvalidNamespace() {
360+
uri, err := archiver.NewURI(testBucketURI)
361+
s.NoError(err)
362+
arc := s.newTestVisibilityArchiver()
363+
req := &archiver.QueryVisibilityRequest{
364+
NamespaceID: "",
365+
PageSize: 1,
366+
NextPageToken: nil,
367+
Query: "",
368+
}
369+
_, err = arc.Query(context.Background(), uri, req, searchattribute.TestNameTypeMap)
370+
371+
var svcErr *serviceerror.InvalidArgument
372+
373+
s.ErrorAs(err, &svcErr)
374+
}
375+
376+
func (s *visibilityArchiverSuite) TestQuery_EmptyQuery_ZeroPageSize() {
377+
uri, err := archiver.NewURI(testBucketURI)
378+
s.NoError(err)
379+
arc := s.newTestVisibilityArchiver()
380+
req := &archiver.QueryVisibilityRequest{
381+
NamespaceID: testNamespaceID,
382+
PageSize: 0,
383+
NextPageToken: nil,
384+
Query: "",
385+
}
386+
_, err = arc.Query(context.Background(), uri, req, searchattribute.TestNameTypeMap)
387+
388+
var svcErr *serviceerror.InvalidArgument
389+
390+
s.ErrorAs(err, &svcErr)
391+
}
392+
393+
func (s *visibilityArchiverSuite) TestQuery_EmptyQuery_Pagination() {
394+
uri, err := archiver.NewURI(testBucketURI)
395+
s.NoError(err)
396+
397+
arc := s.newTestVisibilityArchiver()
398+
399+
response := &archiver.QueryVisibilityResponse{
400+
Executions: nil,
401+
NextPageToken: nil,
402+
}
403+
404+
limit := 10
405+
executions := make(map[string]*workflowpb.WorkflowExecutionInfo, limit)
406+
407+
for i := 0; i < limit; i++ {
408+
req := &archiver.QueryVisibilityRequest{
409+
NamespaceID: testNamespaceID,
410+
PageSize: 1,
411+
NextPageToken: response.NextPageToken,
412+
Query: "",
413+
}
414+
response, err = arc.Query(context.Background(), uri, req, searchattribute.TestNameTypeMap)
415+
s.NoError(err)
416+
s.NotNil(response)
417+
s.Len(response.Executions, 1)
418+
419+
if response.NextPageToken == nil {
420+
break
421+
}
422+
423+
for _, execution := range response.Executions {
424+
key := execution.Execution.GetWorkflowId() +
425+
"/" + execution.Execution.GetRunId() +
426+
"/" + execution.CloseTime.String()
427+
executions[key] = execution
428+
}
429+
430+
if len(executions) > 1 {
431+
return
432+
}
433+
}
434+
s.Fail("there should be at least 2 unique executions across all pages")
435+
}
436+
358437
type precisionTest struct {
359438
day int
360439
hour int

common/archiver/util.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ var (
4141
errEmptyWorkflowTypeName = errors.New("field WorkflowTypeName is empty")
4242
errEmptyStartTime = errors.New("field StartTime is empty")
4343
errEmptyCloseTime = errors.New("field CloseTime is empty")
44-
errEmptyQuery = errors.New("field Query is empty")
44+
ErrEmptyQuery = errors.New("field Query is empty")
4545
)
4646

4747
// TagLoggerWithArchiveHistoryRequestAndURI tags logger with fields in the archive history request and the URI
@@ -144,7 +144,7 @@ func ValidateQueryRequest(request *QueryVisibilityRequest) error {
144144
return errInvalidPageSize
145145
}
146146
if request.Query == "" {
147-
return errEmptyQuery
147+
return ErrEmptyQuery
148148
}
149149
return nil
150150
}

0 commit comments

Comments
 (0)