Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 11 additions & 12 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -328,21 +328,20 @@ install-schema-mysql-pre5720: temporal-sql-tool

install-schema-mysql: temporal-sql-tool
@printf $(COLOR) "Install MySQL schema..."
./temporal-sql-tool --ep 127.0.0.1 create --db temporal
./temporal-sql-tool --ep 127.0.0.1 --db temporal setup-schema -v 0.0
./temporal-sql-tool --ep 127.0.0.1 --db temporal update-schema -d ./schema/mysql/v57/temporal/versioned
./temporal-sql-tool --ep 127.0.0.1 create --db temporal_visibility
./temporal-sql-tool --ep 127.0.0.1 --db temporal_visibility setup-schema -v 0.0
./temporal-sql-tool --ep 127.0.0.1 --db temporal_visibility update-schema -d ./schema/mysql/v57/visibility/versioned
./temporal-sql-tool --ep 127.0.0.1 -u root --pw root create --db temporal
./temporal-sql-tool --ep 127.0.0.1 -u root --pw root --db temporal setup-schema -v 0.0
./temporal-sql-tool --ep 127.0.0.1 -u root --pw root --db temporal update-schema -d ./schema/mysql/v57/temporal/versioned
./temporal-sql-tool --ep 127.0.0.1 -u root --pw root create --db temporal_visibility
./temporal-sql-tool --ep 127.0.0.1 -u root --pw root --db temporal_visibility setup-schema -v 0.0
./temporal-sql-tool --ep 127.0.0.1 -u root --pw root --db temporal_visibility update-schema -d ./schema/mysql/v57/visibility/versioned

install-schema-postgres: temporal-sql-tool
@printf $(COLOR) "Install Postgres schema..."
./temporal-sql-tool --ep 127.0.0.1 -p 5432 -u postgres -pw temporal --pl postgres create --db temporal
./temporal-sql-tool --ep 127.0.0.1 -p 5432 -u postgres -pw temporal --pl postgres --db temporal setup -v 0.0
./temporal-sql-tool --ep 127.0.0.1 -p 5432 -u postgres -pw temporal --pl postgres --db temporal update-schema -d ./schema/postgres/temporal/versioned
./temporal-sql-tool --ep 127.0.0.1 -p 5432 -u postgres -pw temporal --pl postgres create --db temporal_visibility
./temporal-sql-tool --ep 127.0.0.1 -p 5432 -u postgres -pw temporal --pl postgres --db temporal_visibility setup-schema -v 0.0
./temporal-sql-tool --ep 127.0.0.1 -p 5432 -u postgres -pw temporal --pl postgres --db temporal_visibility update-schema -d ./schema/postgres/visibility/versioned
./temporal-sql-tool --ep 127.0.0.1 -p 5432 -u temporal -pw temporal --pl postgres --db temporal setup -v 0.0
./temporal-sql-tool --ep 127.0.0.1 -p 5432 -u temporal -pw temporal --pl postgres --db temporal update-schema -d ./schema/postgres/temporal/versioned
./temporal-sql-tool --ep 127.0.0.1 -p 5432 -u temporal -pw temporal --pl postgres create --db temporal_visibility
./temporal-sql-tool --ep 127.0.0.1 -p 5432 -u temporal -pw temporal --pl postgres --db temporal_visibility setup-schema -v 0.0
./temporal-sql-tool --ep 127.0.0.1 -p 5432 -u temporal -pw temporal --pl postgres --db temporal_visibility update-schema -d ./schema/postgres/visibility/versioned

install-schema-cdc: temporal-cassandra-tool
@printf $(COLOR) "Set up temporal_active key space..."
Expand Down
32 changes: 21 additions & 11 deletions common/persistence/sql/sqlVisibilityStore.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ func (s *sqlVisibilityStore) UpsertWorkflowExecution(request *p.InternalUpsertWo
}

func (s *sqlVisibilityStore) ListOpenWorkflowExecutions(request *p.ListWorkflowExecutionsRequest) (*p.InternalListWorkflowExecutionsResponse, error) {
return s.listWorkflowExecutions("ListOpenWorkflowExecutions", request.NextPageToken, request.EarliestStartTime, request.LatestStartTime,
return s.listWorkflowExecutions("ListOpenWorkflowExecutions", request.NextPageToken, request.EarliestStartTime, request.LatestStartTime, false,
func(readLevel *visibilityPageToken) ([]sqlplugin.VisibilityRow, error) {
minStartTime := time.Unix(0, request.EarliestStartTime)
return s.db.SelectFromVisibility(&sqlplugin.VisibilityFilter{
Expand All @@ -132,7 +132,7 @@ func (s *sqlVisibilityStore) ListOpenWorkflowExecutions(request *p.ListWorkflowE
}

func (s *sqlVisibilityStore) ListClosedWorkflowExecutions(request *p.ListWorkflowExecutionsRequest) (*p.InternalListWorkflowExecutionsResponse, error) {
return s.listWorkflowExecutions("ListClosedWorkflowExecutions", request.NextPageToken, request.EarliestStartTime, request.LatestStartTime,
return s.listWorkflowExecutions("ListClosedWorkflowExecutions", request.NextPageToken, request.EarliestStartTime, request.LatestStartTime, true,
func(readLevel *visibilityPageToken) ([]sqlplugin.VisibilityRow, error) {
minStartTime := time.Unix(0, request.EarliestStartTime)
return s.db.SelectFromVisibility(&sqlplugin.VisibilityFilter{
Expand All @@ -146,7 +146,7 @@ func (s *sqlVisibilityStore) ListClosedWorkflowExecutions(request *p.ListWorkflo
}

func (s *sqlVisibilityStore) ListOpenWorkflowExecutionsByType(request *p.ListWorkflowExecutionsByTypeRequest) (*p.InternalListWorkflowExecutionsResponse, error) {
return s.listWorkflowExecutions("ListOpenWorkflowExecutionsByType", request.NextPageToken, request.EarliestStartTime, request.LatestStartTime,
return s.listWorkflowExecutions("ListOpenWorkflowExecutionsByType", request.NextPageToken, request.EarliestStartTime, request.LatestStartTime, false,
func(readLevel *visibilityPageToken) ([]sqlplugin.VisibilityRow, error) {
minStartTime := time.Unix(0, request.EarliestStartTime)
return s.db.SelectFromVisibility(&sqlplugin.VisibilityFilter{
Expand All @@ -162,7 +162,7 @@ func (s *sqlVisibilityStore) ListOpenWorkflowExecutionsByType(request *p.ListWor
}

func (s *sqlVisibilityStore) ListClosedWorkflowExecutionsByType(request *p.ListWorkflowExecutionsByTypeRequest) (*p.InternalListWorkflowExecutionsResponse, error) {
return s.listWorkflowExecutions("ListClosedWorkflowExecutionsByType", request.NextPageToken, request.EarliestStartTime, request.LatestStartTime,
return s.listWorkflowExecutions("ListClosedWorkflowExecutionsByType", request.NextPageToken, request.EarliestStartTime, request.LatestStartTime, true,
func(readLevel *visibilityPageToken) ([]sqlplugin.VisibilityRow, error) {
minStartTime := time.Unix(0, request.EarliestStartTime)
return s.db.SelectFromVisibility(&sqlplugin.VisibilityFilter{
Expand All @@ -177,7 +177,7 @@ func (s *sqlVisibilityStore) ListClosedWorkflowExecutionsByType(request *p.ListW
}

func (s *sqlVisibilityStore) ListOpenWorkflowExecutionsByWorkflowID(request *p.ListWorkflowExecutionsByWorkflowIDRequest) (*p.InternalListWorkflowExecutionsResponse, error) {
return s.listWorkflowExecutions("ListOpenWorkflowExecutionsByWorkflowID", request.NextPageToken, request.EarliestStartTime, request.LatestStartTime,
return s.listWorkflowExecutions("ListOpenWorkflowExecutionsByWorkflowID", request.NextPageToken, request.EarliestStartTime, request.LatestStartTime, false,
func(readLevel *visibilityPageToken) ([]sqlplugin.VisibilityRow, error) {
minStartTime := time.Unix(0, request.EarliestStartTime)
return s.db.SelectFromVisibility(&sqlplugin.VisibilityFilter{
Expand All @@ -193,7 +193,7 @@ func (s *sqlVisibilityStore) ListOpenWorkflowExecutionsByWorkflowID(request *p.L
}

func (s *sqlVisibilityStore) ListClosedWorkflowExecutionsByWorkflowID(request *p.ListWorkflowExecutionsByWorkflowIDRequest) (*p.InternalListWorkflowExecutionsResponse, error) {
return s.listWorkflowExecutions("ListClosedWorkflowExecutionsByWorkflowID", request.NextPageToken, request.EarliestStartTime, request.LatestStartTime,
return s.listWorkflowExecutions("ListClosedWorkflowExecutionsByWorkflowID", request.NextPageToken, request.EarliestStartTime, request.LatestStartTime, true,
func(readLevel *visibilityPageToken) ([]sqlplugin.VisibilityRow, error) {
minStartTime := time.Unix(0, request.EarliestStartTime)
return s.db.SelectFromVisibility(&sqlplugin.VisibilityFilter{
Expand All @@ -208,7 +208,7 @@ func (s *sqlVisibilityStore) ListClosedWorkflowExecutionsByWorkflowID(request *p
}

func (s *sqlVisibilityStore) ListClosedWorkflowExecutionsByStatus(request *p.ListClosedWorkflowExecutionsByStatusRequest) (*p.InternalListWorkflowExecutionsResponse, error) {
return s.listWorkflowExecutions("ListClosedWorkflowExecutionsByStatus", request.NextPageToken, request.EarliestStartTime, request.LatestStartTime,
return s.listWorkflowExecutions("ListClosedWorkflowExecutionsByStatus", request.NextPageToken, request.EarliestStartTime, request.LatestStartTime, true,
func(readLevel *visibilityPageToken) ([]sqlplugin.VisibilityRow, error) {
minStartTime := time.Unix(0, request.EarliestStartTime)
return s.db.SelectFromVisibility(&sqlplugin.VisibilityFilter{
Expand Down Expand Up @@ -287,7 +287,13 @@ func (s *sqlVisibilityStore) rowToInfo(row *sqlplugin.VisibilityRow) *p.Visibili
return info
}

func (s *sqlVisibilityStore) listWorkflowExecutions(opName string, pageToken []byte, earliestTime int64, latestTime int64, selectOp func(readLevel *visibilityPageToken) ([]sqlplugin.VisibilityRow, error)) (*p.InternalListWorkflowExecutionsResponse, error) {
func (s *sqlVisibilityStore) listWorkflowExecutions(
opName string,
pageToken []byte,
earliestTime int64,
latestTime int64,
closeQuery bool,
selectOp func(readLevel *visibilityPageToken) ([]sqlplugin.VisibilityRow, error)) (*p.InternalListWorkflowExecutionsResponse, error) {
var readLevel *visibilityPageToken
var err error
if len(pageToken) > 0 {
Expand All @@ -312,10 +318,14 @@ func (s *sqlVisibilityStore) listWorkflowExecutions(opName string, pageToken []b
}
var nextPageToken []byte
lastRow := rows[len(rows)-1]
lastStartTime := lastRow.StartTime
if lastStartTime.Sub(time.Unix(0, earliestTime)).Nanoseconds() > 0 {
lastTime := lastRow.StartTime
if closeQuery {
lastTime = *lastRow.CloseTime
}

if lastTime.Sub(time.Unix(0, earliestTime)).Nanoseconds() > 0 {
nextPageToken, err = s.serializePageToken(&visibilityPageToken{
Time: lastStartTime,
Time: lastTime,
RunID: lastRow.RunID,
})
if err != nil {
Expand Down
25 changes: 18 additions & 7 deletions common/persistence/sql/sqlplugin/mysql/visibility.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,16 @@ const (
templateConditions = ` AND namespace_id = ?
AND start_time >= ?
AND start_time <= ?
AND (run_id > ? OR start_time < ?)
AND ((run_id > ? and start_time = ?) OR (start_time < ?))
ORDER BY start_time DESC, run_id
LIMIT ?`
LIMIT ?`

templateConditionsClosedWorkflows = ` AND namespace_id = ?
AND close_time >= ?
AND close_time <= ?
AND ((run_id > ? and close_time = ?) OR (close_time < ?))
ORDER BY close_time DESC, run_id
LIMIT ?`

templateOpenFieldNames = `workflow_id, run_id, start_time, execution_time, workflow_type_name, status, memo, encoding`
templateOpenSelect = `SELECT ` + templateOpenFieldNames + ` FROM executions_visibility WHERE status = 1 `
Expand All @@ -57,17 +64,17 @@ const (

templateGetOpenWorkflowExecutions = templateOpenSelect + templateConditions

templateGetClosedWorkflowExecutions = templateClosedSelect + templateConditions
templateGetClosedWorkflowExecutions = templateClosedSelect + templateConditionsClosedWorkflows

templateGetOpenWorkflowExecutionsByType = templateOpenSelect + `AND workflow_type_name = ?` + templateConditions

templateGetClosedWorkflowExecutionsByType = templateClosedSelect + `AND workflow_type_name = ?` + templateConditions
templateGetClosedWorkflowExecutionsByType = templateClosedSelect + `AND workflow_type_name = ?` + templateConditionsClosedWorkflows

templateGetOpenWorkflowExecutionsByID = templateOpenSelect + `AND workflow_id = ?` + templateConditions

templateGetClosedWorkflowExecutionsByID = templateClosedSelect + `AND workflow_id = ?` + templateConditions
templateGetClosedWorkflowExecutionsByID = templateClosedSelect + `AND workflow_id = ?` + templateConditionsClosedWorkflows

templateGetClosedWorkflowExecutionsByStatus = templateClosedSelect + `AND status = ?` + templateConditions
templateGetClosedWorkflowExecutionsByStatus = templateClosedSelect + `AND status = ?` + templateConditionsClosedWorkflows

templateGetClosedWorkflowExecution = `SELECT workflow_id, run_id, start_time, execution_time, memo, encoding, close_time, workflow_type_name, status, history_length
FROM executions_visibility
Expand Down Expand Up @@ -153,7 +160,8 @@ func (mdb *db) SelectFromVisibility(filter *sqlplugin.VisibilityFilter) ([]sqlpl
mdb.converter.ToMySQLDateTime(*filter.MinStartTime),
mdb.converter.ToMySQLDateTime(*filter.MaxStartTime),
*filter.RunID,
*filter.MinStartTime,
*filter.MaxStartTime,
Copy link
Member Author

@mastermanu mastermanu Jul 21, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@shawnhathaway - this also seemed like a bug to me. can you confirm that this change makes sense

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea this does look like a bug as it doesn't appear MinStartTime is even affected by the paging token and this would break the contract of returning earlier records than specified with the filter it appears.

*filter.MaxStartTime,
*filter.PageSize)
case filter.MinStartTime != nil && filter.WorkflowTypeName != nil:
qry := templateGetOpenWorkflowExecutionsByType
Expand All @@ -168,6 +176,7 @@ func (mdb *db) SelectFromVisibility(filter *sqlplugin.VisibilityFilter) ([]sqlpl
mdb.converter.ToMySQLDateTime(*filter.MaxStartTime),
*filter.RunID,
*filter.MaxStartTime,
*filter.MaxStartTime,
*filter.PageSize)
case filter.MinStartTime != nil && filter.Status != 0 && filter.Status != 1: // 0 is UNSPECIFIED, 1 is RUNNING
err = mdb.conn.Select(&rows,
Expand All @@ -178,6 +187,7 @@ func (mdb *db) SelectFromVisibility(filter *sqlplugin.VisibilityFilter) ([]sqlpl
mdb.converter.ToMySQLDateTime(*filter.MaxStartTime),
*filter.RunID,
mdb.converter.ToMySQLDateTime(*filter.MaxStartTime),
mdb.converter.ToMySQLDateTime(*filter.MaxStartTime),
*filter.PageSize)
case filter.MinStartTime != nil:
qry := templateGetOpenWorkflowExecutions
Expand All @@ -191,6 +201,7 @@ func (mdb *db) SelectFromVisibility(filter *sqlplugin.VisibilityFilter) ([]sqlpl
mdb.converter.ToMySQLDateTime(*filter.MaxStartTime),
*filter.RunID,
mdb.converter.ToMySQLDateTime(*filter.MaxStartTime),
mdb.converter.ToMySQLDateTime(*filter.MaxStartTime),
*filter.PageSize)
default:
return nil, fmt.Errorf("invalid query filter")
Expand Down
34 changes: 26 additions & 8 deletions common/persistence/sql/sqlplugin/postgres/visibility.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,17 +57,31 @@ const (
templateConditions1 = ` AND namespace_id = $1
AND start_time >= $2
AND start_time <= $3
AND (run_id > $4 OR start_time < $5)
AND ((run_id > $4 and start_time = $5) OR (start_time < $6))
ORDER BY start_time DESC, run_id
LIMIT $6`
LIMIT $7`

templateConditions2 = ` AND namespace_id = $2
AND start_time >= $3
AND start_time <= $4
AND (run_id > $5 OR start_time < $6)
AND ((run_id > $5 and start_time = $6) OR (start_time < $7))
ORDER BY start_time DESC, run_id
LIMIT $8`

templateConditionsClosedWorkflow1 = ` AND namespace_id = $1
AND close_time >= $2
AND close_time <= $3
AND ((run_id > $4 and close_time = $5) OR (close_time < $6))
ORDER BY close_time DESC, run_id
LIMIT $7`

templateConditionsClosedWorkflow2 = ` AND namespace_id = $2
AND close_time >= $3
AND close_time <= $4
AND ((run_id > $5 and close_time = $6) OR (close_time < $7))
ORDER BY close_time DESC, run_id
LIMIT $8`

templateOpenFieldNames = `workflow_id, run_id, start_time, execution_time, workflow_type_name, status, memo, encoding`
templateOpenSelect = `SELECT ` + templateOpenFieldNames + ` FROM executions_visibility WHERE status = 1 `

Expand All @@ -76,17 +90,17 @@ const (

templateGetOpenWorkflowExecutions = templateOpenSelect + templateConditions1

templateGetClosedWorkflowExecutions = templateClosedSelect + templateConditions1
templateGetClosedWorkflowExecutions = templateClosedSelect + templateConditionsClosedWorkflow1

templateGetOpenWorkflowExecutionsByType = templateOpenSelect + `AND workflow_type_name = $1` + templateConditions2

templateGetClosedWorkflowExecutionsByType = templateClosedSelect + `AND workflow_type_name = $1` + templateConditions2
templateGetClosedWorkflowExecutionsByType = templateClosedSelect + `AND workflow_type_name = $1` + templateConditionsClosedWorkflow2

templateGetOpenWorkflowExecutionsByID = templateOpenSelect + `AND workflow_id = $1` + templateConditions2

templateGetClosedWorkflowExecutionsByID = templateClosedSelect + `AND workflow_id = $1` + templateConditions2
templateGetClosedWorkflowExecutionsByID = templateClosedSelect + `AND workflow_id = $1` + templateConditionsClosedWorkflow2

templateGetClosedWorkflowExecutionsByStatus = templateClosedSelect + `AND status = $1` + templateConditions2
templateGetClosedWorkflowExecutionsByStatus = templateClosedSelect + `AND status = $1` + templateConditionsClosedWorkflow2

templateGetClosedWorkflowExecution = `SELECT workflow_id, run_id, start_time, execution_time, memo, encoding, close_time, workflow_type_name, status, history_length
FROM executions_visibility
Expand Down Expand Up @@ -172,7 +186,8 @@ func (pdb *db) SelectFromVisibility(filter *sqlplugin.VisibilityFilter) ([]sqlpl
pdb.converter.ToPostgresDateTime(*filter.MinStartTime),
pdb.converter.ToPostgresDateTime(*filter.MaxStartTime),
*filter.RunID,
*filter.MinStartTime,
*filter.MaxStartTime,
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@shawnhathaway - ditto here

*filter.MaxStartTime,
*filter.PageSize)
case filter.MinStartTime != nil && filter.WorkflowTypeName != nil:
qry := templateGetOpenWorkflowExecutionsByType
Expand All @@ -187,6 +202,7 @@ func (pdb *db) SelectFromVisibility(filter *sqlplugin.VisibilityFilter) ([]sqlpl
pdb.converter.ToPostgresDateTime(*filter.MaxStartTime),
*filter.RunID,
*filter.MaxStartTime,
*filter.MaxStartTime,
*filter.PageSize)
case filter.MinStartTime != nil && filter.Status != 0 && filter.Status != 1: // 0 is UNSPECIFIED, 1 is RUNNING
err = pdb.conn.Select(&rows,
Expand All @@ -197,6 +213,7 @@ func (pdb *db) SelectFromVisibility(filter *sqlplugin.VisibilityFilter) ([]sqlpl
pdb.converter.ToPostgresDateTime(*filter.MaxStartTime),
*filter.RunID,
pdb.converter.ToPostgresDateTime(*filter.MaxStartTime),
pdb.converter.ToPostgresDateTime(*filter.MaxStartTime),
*filter.PageSize)
case filter.MinStartTime != nil:
qry := templateGetOpenWorkflowExecutions
Expand All @@ -212,6 +229,7 @@ func (pdb *db) SelectFromVisibility(filter *sqlplugin.VisibilityFilter) ([]sqlpl
maxSt,
*filter.RunID,
maxSt,
maxSt,
*filter.PageSize)
default:
return nil, fmt.Errorf("invalid query filter")
Expand Down
4 changes: 2 additions & 2 deletions config/development_postgres.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ persistence:
databaseName: "temporal"
connectAddr: "127.0.0.1:5432"
connectProtocol: "tcp"
user: "postgres"
user: "temporal"
password: "temporal"
maxConns: 20
maxIdleConns: 20
Expand All @@ -20,7 +20,7 @@ persistence:
databaseName: "temporal_visibility"
connectAddr: "127.0.0.1:5432"
connectProtocol: "tcp"
user: "postgres"
user: "temporal"
password: "temporal"
maxConns: 2
maxIdleConns: 2
Expand Down
9 changes: 6 additions & 3 deletions schema/mysql/v57/visibility/schema.sql
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
CREATE TABLE executions_visibility (
namespace_id CHAR(64) NOT NULL,
namespace_id CHAR(64) NOT NULL,
run_id CHAR(64) NOT NULL,
start_time DATETIME(6) NOT NULL,
execution_time DATETIME(6) NOT NULL,
Expand All @@ -10,11 +10,14 @@ CREATE TABLE executions_visibility (
history_length BIGINT,
memo BLOB,
encoding VARCHAR(64) NOT NULL,
task_queue VARCHAR(255) DEFAULT '' NOT NULL,
task_queue VARCHAR(255) DEFAULT '' NOT NULL,

PRIMARY KEY (namespace_id, run_id)
);

CREATE INDEX by_type_start_time ON executions_visibility (namespace_id, workflow_type_name, status, start_time DESC, run_id);
CREATE INDEX by_workflow_id_start_time ON executions_visibility (namespace_id, workflow_id, status, start_time DESC, run_id);
CREATE INDEX by_status_by_close_time ON executions_visibility (namespace_id, status, start_time DESC, run_id);
CREATE INDEX by_status_by_start_time ON executions_visibility (namespace_id, status, start_time DESC, run_id);
CREATE INDEX by_type_close_time ON executions_visibility (namespace_id, workflow_type_name, status, close_time DESC, run_id);
CREATE INDEX by_workflow_id_close_time ON executions_visibility (namespace_id, workflow_id, status, close_time DESC, run_id);
CREATE INDEX by_status_by_close_time ON executions_visibility (namespace_id, status, close_time DESC, run_id);
Loading