Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 11 additions & 16 deletions common/archiver/gcloud/queryParser.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ package gcloud
import (
"errors"
"fmt"
"strconv"
"time"

"github.com/xwb1989/sqlparser"
Expand All @@ -48,8 +47,8 @@ type (
parsedQuery struct {
workflowID *string
workflowType *string
startTime int64
closeTime int64
startTime time.Time
closeTime time.Time
searchPrecision *string
runID *string
emptyResult bool
Expand Down Expand Up @@ -96,7 +95,7 @@ func (p *queryParser) Parse(query string) (*parsedQuery, error) {
return nil, err
}

if (parsedQuery.closeTime == 0 && parsedQuery.startTime == 0) || (parsedQuery.closeTime != 0 && parsedQuery.startTime != 0) {
if (parsedQuery.closeTime.IsZero() && parsedQuery.startTime.IsZero()) || (!parsedQuery.closeTime.IsZero() && !parsedQuery.startTime.IsZero()) {
return nil, errors.New("Requires a StartTime or CloseTime")
}

Expand Down Expand Up @@ -176,24 +175,24 @@ func (p *queryParser) convertComparisonExpr(compExpr *sqlparser.ComparisonExpr,
}
parsedQuery.runID = convert.StringPtr(val)
case CloseTime:
timestamp, err := convertToTimestamp(valStr)
closeTime, err := convertToTime(valStr)
if err != nil {
return err
}
if op != "=" {
return fmt.Errorf("only operation = is support for %s", CloseTime)
}
parsedQuery.closeTime = timestamp
parsedQuery.closeTime = closeTime

case StartTime:
timestamp, err := convertToTimestamp(valStr)
startTime, err := convertToTime(valStr)
if err != nil {
return err
}
if op != "=" {
return fmt.Errorf("only operation = is support for %s", CloseTime)
}
parsedQuery.startTime = timestamp
parsedQuery.startTime = startTime
case WorkflowType:
val, err := extractStringValue(valStr)
if err != nil {
Expand Down Expand Up @@ -249,20 +248,16 @@ func (p *queryParser) convertCloseTime(timestamp int64, op string, parsedQuery *
return nil
}

func convertToTimestamp(timeStr string) (int64, error) {
timestamp, err := strconv.ParseInt(timeStr, 10, 64)
if err == nil {
return timestamp, nil
}
func convertToTime(timeStr string) (time.Time, error) {
timestampStr, err := extractStringValue(timeStr)
if err != nil {
return 0, err
return time.Time{}, err
}
parsedTime, err := time.Parse(defaultDateTimeFormat, timestampStr)
if err != nil {
return 0, err
return time.Time{}, err
}
return parsedTime.UnixNano(), nil
return parsedTime, nil
}

func extractStringValue(s string) (string, error) {
Expand Down
3 changes: 1 addition & 2 deletions common/archiver/gcloud/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,7 @@ func constructVisibilityFilenamePrefix(namespaceID, tag string) string {
return fmt.Sprintf("%s/%s", namespaceID, tag)
}

func constructTimeBasedSearchKey(namespaceID, tag string, timestamp int64, precision string) string {
t := time.Unix(0, timestamp).UTC()
func constructTimeBasedSearchKey(namespaceID, tag string, t time.Time, precision string) string {
var timeFormat = ""
switch precision {
case PrecisionSecond:
Expand Down
3 changes: 2 additions & 1 deletion common/archiver/gcloud/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,8 @@ func (s *utilSuite) TestConstructVisibilityFilenamePrefix() {
}

func (s *utilSuite) TestConstructTimeBasedSearchKey() {
s.Equal("namespaceID/startTimeout_1970-01-01T", constructTimeBasedSearchKey("namespaceID", indexKeyStartTimeout, 1580819141, "Day"))
t, _ := time.Parse(time.RFC3339, "2019-10-04T11:00:00+00:00")
s.Equal("namespaceID/startTimeout_2019-10-04T", constructTimeBasedSearchKey("namespaceID", indexKeyStartTimeout, t, "Day"))
}

func (s *utilSuite) TestConstructVisibilityFilename() {
Expand Down
4 changes: 2 additions & 2 deletions common/archiver/gcloud/visibilityArchiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,10 +191,10 @@ func (v *visibilityArchiver) query(ctx context.Context, URI archiver.URI, reques
}

var prefix = constructVisibilityFilenamePrefix(request.namespaceID, indexKeyCloseTimeout)
if request.parsedQuery.closeTime != 0 {
if !request.parsedQuery.closeTime.IsZero() {
prefix = constructTimeBasedSearchKey(request.namespaceID, indexKeyCloseTimeout, request.parsedQuery.closeTime, *request.parsedQuery.searchPrecision)
}
if request.parsedQuery.startTime != 0 {
if !request.parsedQuery.startTime.IsZero() {
prefix = constructTimeBasedSearchKey(request.namespaceID, indexKeyStartTimeout, request.parsedQuery.startTime, *request.parsedQuery.searchPrecision)
}

Expand Down
13 changes: 9 additions & 4 deletions common/archiver/gcloud/visibilityArchiver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"context"
"errors"
"testing"
"time"

"github.com/golang/mock/gomock"
"github.com/stretchr/testify/mock"
Expand Down Expand Up @@ -232,9 +233,11 @@ func (s *visibilityArchiverSuite) TestQuery_Fail_InvalidToken() {
defer mockCtrl.Finish()

mockParser := NewMockQueryParser(mockCtrl)
startTime, _ := time.Parse(time.RFC3339, "2019-10-04T11:00:00+00:00")
closeTime := startTime.Add(time.Hour)
mockParser.EXPECT().Parse(gomock.Any()).Return(&parsedQuery{
closeTime: int64(101),
startTime: int64(1),
closeTime: closeTime,
startTime: startTime,
}, nil)
visibilityArchiver.queryParser = mockParser
request := &archiver.QueryVisibilityRequest{
Expand Down Expand Up @@ -264,8 +267,9 @@ func (s *visibilityArchiverSuite) TestQuery_Success_NoNextPageToken() {

mockParser := NewMockQueryParser(mockCtrl)
dayPrecision := string("Day")
closeTime, _ := time.Parse(time.RFC3339, "2019-10-04T11:00:00+00:00")
mockParser.EXPECT().Parse(gomock.Any()).Return(&parsedQuery{
closeTime: int64(101),
closeTime: closeTime,
searchPrecision: &dayPrecision,
workflowType: convert.StringPtr("MobileOnlyWorkflow::processMobileOnly"),
workflowID: convert.StringPtr(testWorkflowID),
Expand Down Expand Up @@ -307,8 +311,9 @@ func (s *visibilityArchiverSuite) TestQuery_Success_SmallPageSize() {

mockParser := NewMockQueryParser(mockCtrl)
dayPrecision := "Day"
closeTime, _ := time.Parse(time.RFC3339, "2019-10-04T11:00:00+00:00")
mockParser.EXPECT().Parse(gomock.Any()).Return(&parsedQuery{
closeTime: int64(101),
closeTime: closeTime,
searchPrecision: &dayPrecision,
workflowType: convert.StringPtr("MobileOnlyWorkflow::processMobileOnly"),
workflowID: convert.StringPtr(testWorkflowID),
Expand Down
5 changes: 2 additions & 3 deletions common/archiver/s3store/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,9 +169,8 @@ func constructTimeBasedSearchKey(path, namespaceID, primaryIndexKey, primaryInde
return fmt.Sprintf("%s/%s", constructVisibilitySearchPrefix(path, namespaceID, primaryIndexKey, primaryIndexValue, secondaryIndexKey), t.Format(timeFormat))
}

func constructTimestampIndex(path, namespaceID, primaryIndexKey, primaryIndexValue, secondaryIndexKey string, timestamp int64, runID string) string {
t := time.Unix(0, timestamp).UTC()
return fmt.Sprintf("%s/%s/%s", constructVisibilitySearchPrefix(path, namespaceID, primaryIndexKey, primaryIndexValue, secondaryIndexKey), t.Format(time.RFC3339), runID)
func constructTimestampIndex(path, namespaceID, primaryIndexKey, primaryIndexValue, secondaryIndexKey string, secondaryIndexValue time.Time, runID string) string {
return fmt.Sprintf("%s/%s/%s", constructVisibilitySearchPrefix(path, namespaceID, primaryIndexKey, primaryIndexValue, secondaryIndexKey), secondaryIndexValue.Format(time.RFC3339), runID)
}

func constructVisibilitySearchPrefix(path, namespaceID, primaryIndexKey, primaryIndexValue, secondaryIndexType string) string {
Expand Down
11 changes: 6 additions & 5 deletions common/archiver/s3store/visibilityArchiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ package s3store

import (
"context"
"time"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
Expand Down Expand Up @@ -59,7 +60,7 @@ type (
primaryIndex string
primaryIndexValue string
secondaryIndex string
secondaryIndexTimestamp int64
secondaryIndexTimestamp time.Time
}
)

Expand Down Expand Up @@ -154,10 +155,10 @@ func (v *visibilityArchiver) Archive(
}
func createIndexesToArchive(request *archiverspb.ArchiveVisibilityRequest) []indexToArchive {
return []indexToArchive{
{primaryIndexKeyWorkflowTypeName, request.WorkflowTypeName, secondaryIndexKeyCloseTimeout, timestamp.TimeValue(request.CloseTime).UnixNano()},
{primaryIndexKeyWorkflowTypeName, request.WorkflowTypeName, secondaryIndexKeyStartTimeout, timestamp.TimeValue(request.StartTime).UnixNano()},
{primaryIndexKeyWorkflowID, request.GetWorkflowId(), secondaryIndexKeyCloseTimeout, timestamp.TimeValue(request.CloseTime).UnixNano()},
{primaryIndexKeyWorkflowID, request.GetWorkflowId(), secondaryIndexKeyStartTimeout, timestamp.TimeValue(request.StartTime).UnixNano()},
{primaryIndexKeyWorkflowTypeName, request.WorkflowTypeName, secondaryIndexKeyCloseTimeout, timestamp.TimeValue(request.CloseTime)},
{primaryIndexKeyWorkflowTypeName, request.WorkflowTypeName, secondaryIndexKeyStartTimeout, timestamp.TimeValue(request.StartTime)},
{primaryIndexKeyWorkflowID, request.GetWorkflowId(), secondaryIndexKeyCloseTimeout, timestamp.TimeValue(request.CloseTime)},
{primaryIndexKeyWorkflowID, request.GetWorkflowId(), secondaryIndexKeyStartTimeout, timestamp.TimeValue(request.StartTime)},
}
}

Expand Down
2 changes: 1 addition & 1 deletion common/archiver/s3store/visibilityArchiver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ func (s *visibilityArchiverSuite) TestArchive_Success() {
err = visibilityArchiver.Archive(context.Background(), URI, request)
s.NoError(err)

expectedKey := constructTimestampIndex(URI.Path(), testNamespaceID, primaryIndexKeyWorkflowID, testWorkflowID, secondaryIndexKeyCloseTimeout, closeTimestamp.UnixNano(), testRunID)
expectedKey := constructTimestampIndex(URI.Path(), testNamespaceID, primaryIndexKeyWorkflowID, testWorkflowID, secondaryIndexKeyCloseTimeout, timestamp.TimeValue(closeTimestamp), testRunID)
data, err := download(context.Background(), visibilityArchiver.s3cli, URI, expectedKey)
s.NoError(err, expectedKey)

Expand Down